diff options
author | Valentin Anger <syrupthinker@gryphno.de> | 2020-06-01 20:20:47 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-01 14:20:47 -0400 |
commit | becbb56b19e96e4dd72b861217a855fba953d290 (patch) | |
tree | d9e99771c537ef87a4a945f0120775c337ef90aa /core | |
parent | 12d741c2fe453625d510313beaa2e1c282784c00 (diff) |
feat(core): Ops can take several zero copy buffers (#4788)
Diffstat (limited to 'core')
-rw-r--r-- | core/bindings.rs | 45 | ||||
-rw-r--r-- | core/core.js | 8 | ||||
-rw-r--r-- | core/core_isolate.rs | 65 | ||||
-rw-r--r-- | core/es_isolate.rs | 2 | ||||
-rw-r--r-- | core/examples/http_bench.js | 8 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 30 | ||||
-rw-r--r-- | core/ops.rs | 10 | ||||
-rw-r--r-- | core/plugin_api.rs | 3 | ||||
-rw-r--r-- | core/zero_copy_buf.rs | 1 |
9 files changed, 130 insertions, 42 deletions
diff --git a/core/bindings.rs b/core/bindings.rs index 7bcb99a38..b6390073a 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -452,17 +452,50 @@ fn send( Err(_) => &[], }; - let zero_copy: Option<ZeroCopyBuf> = - v8::Local::<v8::ArrayBufferView>::try_from(args.get(2)) - .map(ZeroCopyBuf::new) - .ok(); - let state_rc = CoreIsolate::state(scope.isolate()); let mut state = state_rc.borrow_mut(); assert!(!state.global_context.is_empty()); + let mut buf_iter = (2..args.length()).map(|idx| { + v8::Local::<v8::ArrayBufferView>::try_from(args.get(idx)) + .map(ZeroCopyBuf::new) + .map_err(|err| { + let msg = format!("Invalid argument at position {}: {}", idx, err); + let msg = v8::String::new(scope, &msg).unwrap(); + v8::Exception::type_error(scope, msg) + }) + }); + + let mut buf_one: ZeroCopyBuf; + let mut buf_vec: Vec<ZeroCopyBuf>; + + // Collect all ArrayBufferView's + let buf_iter_result = match buf_iter.len() { + 0 => Ok(&mut [][..]), + 1 => match buf_iter.next().unwrap() { + Ok(buf) => { + buf_one = buf; + Ok(std::slice::from_mut(&mut buf_one)) + } + Err(err) => Err(err), + }, + _ => match buf_iter.collect::<Result<Vec<_>, _>>() { + Ok(v) => { + buf_vec = v; + Ok(&mut buf_vec[..]) + } + Err(err) => Err(err), + }, + }; + // If response is empty then it's either async op or exception was thrown - let maybe_response = state.dispatch_op(scope, op_id, control, zero_copy); + let maybe_response = match buf_iter_result { + Ok(bufs) => state.dispatch_op(scope, op_id, control, bufs), + Err(exc) => { + scope.isolate().throw_exception(exc); + return; + } + }; if let Some(response) = maybe_response { // Synchronous response. diff --git a/core/core.js b/core/core.js index 4c6f708bb..23cc325ab 100644 --- a/core/core.js +++ b/core/core.js @@ -59,7 +59,7 @@ SharedQueue Binary Layout function ops() { // op id 0 is a special value to retrieve the map of registered ops. - const opsMapBytes = send(0, new Uint8Array([]), null); + const opsMapBytes = send(0, new Uint8Array([])); const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); return JSON.parse(opsMapJson); } @@ -181,13 +181,9 @@ SharedQueue Binary Layout } } - function dispatch(opId, control, zeroCopy = null) { - return send(opId, control, zeroCopy); - } - Object.assign(window.Deno.core, { setAsyncHandler, - dispatch, + dispatch: send, ops, // sharedQueue is private but exposed for testing. sharedQueue: { diff --git a/core/core_isolate.rs b/core/core_isolate.rs index dff887ab3..5ddc6571b 100644 --- a/core/core_isolate.rs +++ b/core/core_isolate.rs @@ -366,7 +366,7 @@ impl CoreIsolate { /// Requires runtime to explicitly ask for op ids before using any of the ops. pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId where - F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static, + F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static, { let state_rc = Self::state(self); let mut state = state_rc.borrow_mut(); @@ -484,7 +484,7 @@ impl CoreIsolateState { /// Requires runtime to explicitly ask for op ids before using any of the ops. pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId where - F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static, + F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static, { self.op_registry.register(name, op) } @@ -504,10 +504,10 @@ impl CoreIsolateState { scope: &mut impl v8::ToLocal<'s>, op_id: OpId, control_buf: &[u8], - zero_copy_buf: Option<ZeroCopyBuf>, + zero_copy_bufs: &mut [ZeroCopyBuf], ) -> Option<(OpId, Box<[u8]>)> { let op = if let Some(dispatcher) = self.op_registry.get(op_id) { - dispatcher(self, control_buf, zero_copy_buf) + dispatcher(self, control_buf, zero_copy_bufs) } else { let message = v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap(); @@ -718,6 +718,7 @@ pub mod tests { pub enum Mode { Async, AsyncUnref, + AsyncZeroCopy(u8), OverflowReqSync, OverflowResSync, OverflowReqAsync, @@ -732,7 +733,7 @@ pub mod tests { let dispatcher = move |_state: &mut CoreIsolateState, control: &[u8], - _zero_copy: Option<ZeroCopyBuf>| + zero_copy: &mut [ZeroCopyBuf]| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { @@ -752,6 +753,18 @@ pub mod tests { }; Op::AsyncUnref(fut.boxed()) } + Mode::AsyncZeroCopy(count) => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 24); + assert_eq!(zero_copy.len(), count as usize); + zero_copy.iter().enumerate().for_each(|(idx, buf)| { + assert_eq!(buf.len(), 1); + assert_eq!(idx, buf[0] as usize); + }); + + let buf = vec![43u8].into_boxed_slice(); + Op::Async(futures::future::ready(buf).boxed()) + } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); @@ -817,6 +830,48 @@ pub mod tests { } #[test] + fn test_dispatch_no_zero_copy_buf() { + let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(0)); + js_check(isolate.execute( + "filename.js", + r#" + let control = new Uint8Array([24]); + Deno.core.send(1, control); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_dispatch_one_zero_copy_buf() { + let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(1)); + js_check(isolate.execute( + "filename.js", + r#" + let control = new Uint8Array([24]); + let zero_copy = new Uint8Array([0]); + Deno.core.send(1, control, zero_copy); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_dispatch_two_zero_copy_bufs() { + let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(2)); + js_check(isolate.execute( + "filename.js", + r#" + let control = new Uint8Array([24]); + let zero_copy_a = new Uint8Array([0]); + let zero_copy_b = new Uint8Array([1]); + Deno.core.send(1, control, zero_copy_a, zero_copy_b); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + } + + #[test] fn test_poll_async_delayed_ops() { run_in_task(|cx| { let (mut isolate, dispatch_count) = setup(Mode::Async); diff --git a/core/es_isolate.rs b/core/es_isolate.rs index a23af43d7..35cf177f8 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -712,7 +712,7 @@ pub mod tests { let dispatcher = move |_state: &mut CoreIsolateState, control: &[u8], - _zero_copy: Option<ZeroCopyBuf>| + _zero_copy: &mut [ZeroCopyBuf]| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); assert_eq!(control.len(), 1); diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index d9878cbe7..a893dab40 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -36,18 +36,18 @@ const scratchBytes = new Uint8Array( ); assert(scratchBytes.byteLength === 3 * 4); -function send(promiseId, opId, rid, zeroCopy = null) { +function send(promiseId, opId, rid, ...zeroCopy) { scratch32[0] = promiseId; scratch32[1] = rid; scratch32[2] = -1; - return Deno.core.dispatch(opId, scratchBytes, zeroCopy); + return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy); } /** Returns Promise<number> */ -function sendAsync(opId, rid, zeroCopy = null) { +function sendAsync(opId, rid, ...zeroCopy) { const promiseId = nextPromiseId++; const p = createResolvable(); - const buf = send(promiseId, opId, rid, zeroCopy); + const buf = send(promiseId, opId, rid, ...zeroCopy); if (buf) { const record = recordFromBuf(buf); // Sync result. diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index a52f69fcb..233864fac 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -113,19 +113,19 @@ impl Isolate { fn register_sync_op<F>(&mut self, name: &'static str, handler: F) where - F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>, + F: 'static + Fn(State, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error>, { let state = self.state.clone(); let core_handler = move |_isolate_state: &mut CoreIsolateState, control_buf: &[u8], - zero_copy_buf: Option<ZeroCopyBuf>| + zero_copy_bufs: &mut [ZeroCopyBuf]| -> Op { let state = state.clone(); let record = Record::from(control_buf); let is_sync = record.promise_id == 0; assert!(is_sync); - let result: i32 = match handler(state, record.rid, zero_copy_buf) { + let result: i32 = match handler(state, record.rid, zero_copy_bufs) { Ok(r) => r as i32, Err(_) => -1, }; @@ -139,7 +139,7 @@ impl Isolate { fn register_op<F>( &mut self, name: &'static str, - handler: impl Fn(State, u32, Option<ZeroCopyBuf>) -> F + Copy + 'static, + handler: impl Fn(State, u32, &mut [ZeroCopyBuf]) -> F + Copy + 'static, ) where F: TryFuture, F::Ok: TryInto<i32>, @@ -148,15 +148,16 @@ impl Isolate { let state = self.state.clone(); let core_handler = move |_isolate_state: &mut CoreIsolateState, control_buf: &[u8], - zero_copy_buf: Option<ZeroCopyBuf>| + zero_copy_bufs: &mut [ZeroCopyBuf]| -> Op { let state = state.clone(); let record = Record::from(control_buf); let is_sync = record.promise_id == 0; assert!(!is_sync); + let mut zero_copy = zero_copy_bufs.to_vec(); let fut = async move { - let op = handler(state, record.rid, zero_copy_buf); + let op = handler(state, record.rid, &mut zero_copy); let result = op .map_ok(|r| r.try_into().expect("op result does not fit in i32")) .unwrap_or_else(|_| -1) @@ -182,7 +183,7 @@ impl Future for Isolate { fn op_close( state: State, rid: u32, - _buf: Option<ZeroCopyBuf>, + _buf: &mut [ZeroCopyBuf], ) -> Result<u32, Error> { debug!("close rid={}", rid); let resource_table = &mut state.borrow_mut().resource_table; @@ -195,7 +196,7 @@ fn op_close( fn op_listen( state: State, _rid: u32, - _buf: Option<ZeroCopyBuf>, + _buf: &mut [ZeroCopyBuf], ) -> Result<u32, Error> { debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); @@ -209,7 +210,7 @@ fn op_listen( fn op_accept( state: State, rid: u32, - _buf: Option<ZeroCopyBuf>, + _buf: &mut [ZeroCopyBuf], ) -> impl TryFuture<Ok = u32, Error = Error> { debug!("accept rid={}", rid); @@ -227,9 +228,11 @@ fn op_accept( fn op_read( state: State, rid: u32, - buf: Option<ZeroCopyBuf>, + bufs: &mut [ZeroCopyBuf], ) -> impl TryFuture<Ok = usize, Error = Error> { - let mut buf = buf.unwrap(); + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + let mut buf = bufs[0].clone(); + debug!("read rid={}", rid); poll_fn(move |cx| { @@ -244,9 +247,10 @@ fn op_read( fn op_write( state: State, rid: u32, - buf: Option<ZeroCopyBuf>, + bufs: &mut [ZeroCopyBuf], ) -> impl TryFuture<Ok = usize, Error = Error> { - let buf = buf.unwrap(); + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + let buf = bufs[0].clone(); debug!("write rid={}", rid); poll_fn(move |cx| { diff --git a/core/ops.rs b/core/ops.rs index ecece7355..bd9d58283 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -22,7 +22,7 @@ pub enum Op { /// Main type describing op pub type OpDispatcher = - dyn Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static; + dyn Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static; #[derive(Default)] pub struct OpRegistry { @@ -43,7 +43,7 @@ impl OpRegistry { pub fn register<F>(&mut self, name: &str, op: F) -> OpId where - F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static, + F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static, { let op_id = self.dispatchers.len() as u32; @@ -92,7 +92,7 @@ fn test_op_registry() { let dispatch = op_registry.get(test_id).unwrap(); let state_rc = CoreIsolate::state(&isolate); let mut state = state_rc.borrow_mut(); - let res = dispatch(&mut state, &[], None); + let res = dispatch(&mut state, &[], &mut []); if let Op::Sync(buf) = res { assert_eq!(buf.len(), 0); } else { @@ -139,7 +139,7 @@ fn register_op_during_call() { { let state_rc = CoreIsolate::state(&isolate); let mut state = state_rc.borrow_mut(); - dispatcher1(&mut state, &[], None); + dispatcher1(&mut state, &[], &mut []); } let mut expected = HashMap::new(); @@ -157,7 +157,7 @@ fn register_op_during_call() { }; let state_rc = CoreIsolate::state(&isolate); let mut state = state_rc.borrow_mut(); - let res = dispatcher2(&mut state, &[], None); + let res = dispatcher2(&mut state, &[], &mut []); if let Op::Sync(buf) = res { assert_eq!(buf.len(), 0); } else { diff --git a/core/plugin_api.rs b/core/plugin_api.rs index 2e93fdb77..16f5d4a36 100644 --- a/core/plugin_api.rs +++ b/core/plugin_api.rs @@ -15,8 +15,7 @@ pub use crate::ZeroCopyBuf; pub type InitFn = fn(&mut dyn Interface); -pub type DispatchOpFn = - fn(&mut dyn Interface, &[u8], Option<ZeroCopyBuf>) -> Op; +pub type DispatchOpFn = fn(&mut dyn Interface, &[u8], &mut [ZeroCopyBuf]) -> Op; pub trait Interface { fn register_op(&mut self, name: &str, dispatcher: DispatchOpFn) -> OpId; diff --git a/core/zero_copy_buf.rs b/core/zero_copy_buf.rs index b10c14045..25c468ffe 100644 --- a/core/zero_copy_buf.rs +++ b/core/zero_copy_buf.rs @@ -8,6 +8,7 @@ use std::ops::DerefMut; /// but the existence of a ZeroCopyBuf inhibits this until it is dropped. It /// behaves much like an Arc<[u8]>, although a ZeroCopyBuf currently can't be /// cloned. +#[derive(Clone)] pub struct ZeroCopyBuf { backing_store: v8::SharedRef<v8::BackingStore>, byte_offset: usize, |