diff options
author | Aaron O'Mullan <aaron.omullan@gmail.com> | 2021-03-31 16:37:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-31 10:37:38 -0400 |
commit | fec1b2a5a4324a7eecdfbb2471931f3b6b0139c5 (patch) | |
tree | 8a650553c2d70e047d9d7365f9ac8702ec9861a5 /core/ops_bin.rs | |
parent | 6dc3549a818ad49b3907d18c93fd422a9cc743a5 (diff) |
refactor: new optimized op-layer using serde_v8 (#9843)
- Improves op performance.
- Handle op-metadata (errors, promise IDs) explicitly in the op-layer vs
per op-encoding (aka: out-of-payload).
- Remove shared queue & custom "asyncHandlers", all async values are
returned in batches via js_recv_cb.
- The op-layer should be thought of as simple function calls with little
indirection or translation besides the conceptually straightforward
serde_v8 bijections.
- Preserve concepts of json/bin/min as semantic groups of their
inputs/outputs instead of their op-encoding strategy, preserving these
groups will also facilitate partial transitions over to v8 Fast API for the
"min" and "bin" groups
Diffstat (limited to 'core/ops_bin.rs')
-rw-r--r-- | core/ops_bin.rs | 340 |
1 files changed, 67 insertions, 273 deletions
diff --git a/core/ops_bin.rs b/core/ops_bin.rs index 3e13c23d5..cdd2d630d 100644 --- a/core/ops_bin.rs +++ b/core/ops_bin.rs @@ -1,54 +1,23 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +use crate::error::type_error; use crate::error::AnyError; use crate::futures::future::FutureExt; +use crate::serialize_op_result; use crate::BufVec; use crate::Op; use crate::OpFn; +use crate::OpPayload; +use crate::OpResponse; use crate::OpState; use crate::ZeroCopyBuf; use std::boxed::Box; use std::cell::RefCell; -use std::convert::TryInto; use std::future::Future; use std::rc::Rc; -#[derive(Copy, Clone, Debug, PartialEq)] -pub struct RequestHeader { - pub request_id: u64, - pub argument: u32, -} - -impl RequestHeader { - pub fn from_raw(bytes: &[u8]) -> Option<Self> { - if bytes.len() < 3 * 4 { - return None; - } - - Some(Self { - request_id: u64::from_le_bytes(bytes[0..8].try_into().unwrap()), - argument: u32::from_le_bytes(bytes[8..12].try_into().unwrap()), - }) - } -} - -#[derive(Copy, Clone, Debug, PartialEq)] -pub struct ResponseHeader { - pub request_id: u64, - pub status: u32, - pub result: u32, -} - -impl From<ResponseHeader> for [u8; 16] { - fn from(r: ResponseHeader) -> Self { - let mut resp_header = [0u8; 16]; - resp_header[0..8].copy_from_slice(&r.request_id.to_le_bytes()); - resp_header[8..12].copy_from_slice(&r.status.to_le_bytes()); - resp_header[12..16].copy_from_slice(&r.result.to_le_bytes()); - resp_header - } -} - +// TODO: rewrite this, to have consistent buffer returns +// possibly via direct serde_v8 support pub trait ValueOrVector { fn value(&self) -> u32; fn vector(self) -> Option<Vec<u8>>; @@ -72,10 +41,6 @@ impl ValueOrVector for u32 { } } -fn gen_padding_32bit(len: usize) -> &'static [u8] { - &[b' ', b' ', b' '][0..(4 - (len & 3)) & 3] -} - /// Creates an op that passes data synchronously using raw ui8 buffer. /// /// The provided function `op_fn` has the following parameters: @@ -104,78 +69,47 @@ where F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static, R: ValueOrVector, { - Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { - let mut bufs_iter = bufs.into_iter(); - let record_buf = bufs_iter.next().expect("Expected record at position 0"); - let mut zero_copy = bufs_iter.collect::<BufVec>(); - - let req_header = match RequestHeader::from_raw(&record_buf) { - Some(r) => r, - None => { - let error_class = b"TypeError"; - let error_message = b"Unparsable control buffer"; - let len = error_class.len() + error_message.len(); - let padding = gen_padding_32bit(len); - let resp_header = ResponseHeader { - request_id: 0, - status: 1, - result: error_class.len() as u32, - }; - return Op::Sync( - error_class - .iter() - .chain(error_message.iter()) - .chain(padding) - .chain(&Into::<[u8; 16]>::into(resp_header)) - .cloned() - .collect(), - ); - } - }; + Box::new(move |state, payload, buf| -> Op { + let min_arg: u32 = payload.deserialize().unwrap(); + // For sig compat map Option<ZeroCopyBuf> to BufVec + let mut bufs: BufVec = match buf { + Some(b) => vec![b], + None => vec![], + } + .into(); + // Bin op buffer arg assert + if bufs.is_empty() { + return Op::Sync(serialize_bin_result::<u32>( + Err(type_error("bin-ops require a non-null buffer arg")), + state, + )); + } - match op_fn(&mut state.borrow_mut(), req_header.argument, &mut zero_copy) { - Ok(possibly_vector) => { - let resp_header = ResponseHeader { - request_id: req_header.request_id, - status: 0, - result: possibly_vector.value(), - }; - let resp_encoded_header = Into::<[u8; 16]>::into(resp_header); + let result = op_fn(&mut state.borrow_mut(), min_arg, &mut bufs); + Op::Sync(serialize_bin_result(result, state)) + }) +} - let resp_vector = match possibly_vector.vector() { - Some(mut vector) => { - let padding = gen_padding_32bit(vector.len()); - vector.extend(padding); - vector.extend(&resp_encoded_header); - vector - } - None => resp_encoded_header.to_vec(), - }; - Op::Sync(resp_vector.into_boxed_slice()) - } - Err(error) => { - let error_class = - (state.borrow().get_error_class_fn)(&error).as_bytes(); - let error_message = error.to_string().as_bytes().to_owned(); - let len = error_class.len() + error_message.len(); - let padding = gen_padding_32bit(len); - let resp_header = ResponseHeader { - request_id: req_header.request_id, - status: 1, - result: error_class.len() as u32, - }; - return Op::Sync( - error_class - .iter() - .chain(error_message.iter()) - .chain(padding) - .chain(&Into::<[u8; 16]>::into(resp_header)) - .cloned() - .collect(), - ); +// wraps serialize_op_result but handles ValueOrVector +fn serialize_bin_result<R>( + result: Result<R, AnyError>, + state: Rc<RefCell<OpState>>, +) -> OpResponse +where + R: ValueOrVector, +{ + match result { + Ok(v) => { + let min_val = v.value(); + match v.vector() { + // Warning! this is incorrect, but buffers aren't use ATM, will fix in future PR + Some(vec) => OpResponse::Buffer(vec.into()), + // u32 + None => serialize_op_result(Ok(min_val), state), } } - }) + Err(e) => serialize_op_result::<()>(Err(e), state), + } } /// Creates an op that passes data asynchronously using raw ui8 buffer. @@ -208,170 +142,30 @@ where R: Future<Output = Result<RV, AnyError>> + 'static, RV: ValueOrVector, { - Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { - let mut bufs_iter = bufs.into_iter(); - let record_buf = bufs_iter.next().expect("Expected record at position 0"); - let zero_copy = bufs_iter.collect::<BufVec>(); - - let req_header = match RequestHeader::from_raw(&record_buf) { - Some(r) => r, - None => { - let error_class = b"TypeError"; - let error_message = b"Unparsable control buffer"; - let len = error_class.len() + error_message.len(); - let padding = gen_padding_32bit(len); - let resp_header = ResponseHeader { - request_id: 0, - status: 1, - result: error_class.len() as u32, - }; - return Op::Sync( - error_class - .iter() - .chain(error_message.iter()) - .chain(padding) - .chain(&Into::<[u8; 16]>::into(resp_header)) - .cloned() - .collect(), - ); + Box::new( + move |state: Rc<RefCell<OpState>>, + p: OpPayload, + b: Option<ZeroCopyBuf>| + -> Op { + let min_arg: u32 = p.deserialize().unwrap(); + // For sig compat map Option<ZeroCopyBuf> to BufVec + let bufs: BufVec = match b { + Some(b) => vec![b], + None => vec![], + } + .into(); + // Bin op buffer arg assert + if bufs.is_empty() { + return Op::Sync(serialize_bin_result::<u32>( + Err(type_error("bin-ops require a non-null buffer arg")), + state, + )); } - }; - - let fut = - op_fn(state.clone(), req_header.argument, zero_copy).map(move |result| { - match result { - Ok(possibly_vector) => { - let resp_header = ResponseHeader { - request_id: req_header.request_id, - status: 0, - result: possibly_vector.value(), - }; - let resp_encoded_header = Into::<[u8; 16]>::into(resp_header); - - let resp_vector = match possibly_vector.vector() { - Some(mut vector) => { - let padding = gen_padding_32bit(vector.len()); - vector.extend(padding); - vector.extend(&resp_encoded_header); - vector - } - None => resp_encoded_header.to_vec(), - }; - resp_vector.into_boxed_slice() - } - Err(error) => { - let error_class = - (state.borrow().get_error_class_fn)(&error).as_bytes(); - let error_message = error.to_string().as_bytes().to_owned(); - let len = error_class.len() + error_message.len(); - let padding = gen_padding_32bit(len); - let resp_header = ResponseHeader { - request_id: req_header.request_id, - status: 1, - result: error_class.len() as u32, - }; - - error_class - .iter() - .chain(error_message.iter()) - .chain(padding) - .chain(&Into::<[u8; 16]>::into(resp_header)) - .cloned() - .collect() - } - } - }); - let temp = Box::pin(fut); - Op::Async(temp) - }) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn padding() { - assert_eq!(gen_padding_32bit(0), &[] as &[u8]); - assert_eq!(gen_padding_32bit(1), &[b' ', b' ', b' ']); - assert_eq!(gen_padding_32bit(2), &[b' ', b' ']); - assert_eq!(gen_padding_32bit(3), &[b' ']); - assert_eq!(gen_padding_32bit(4), &[] as &[u8]); - assert_eq!(gen_padding_32bit(5), &[b' ', b' ', b' ']); - } - - #[test] - fn response_header_to_bytes() { - // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´ - let resp_header = ResponseHeader { - request_id: 0x0102030405060708u64, - status: 0x090A0B0Cu32, - result: 0x0D0E0F10u32, - }; - - // All numbers are always little-endian encoded, as the js side also wants this to be fixed - assert_eq!( - &Into::<[u8; 16]>::into(resp_header), - &[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 16, 15, 14, 13] - ); - } - - #[test] - fn response_header_to_bytes_max_value() { - // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´ - let resp_header = ResponseHeader { - request_id: (1u64 << 53u64) - 1u64, - status: 0xFFFFFFFFu32, - result: 0xFFFFFFFFu32, - }; - - // All numbers are always little-endian encoded, as the js side also wants this to be fixed - assert_eq!( - &Into::<[u8; 16]>::into(resp_header), - &[ - 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, 255, 255, 255, - 255 - ] - ); - } - - #[test] - fn request_header_from_bytes() { - let req_header = - RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9]) - .unwrap(); - - assert_eq!(req_header.request_id, 0x0102030405060708u64); - assert_eq!(req_header.argument, 0x090A0B0Cu32); - } - - #[test] - fn request_header_from_bytes_max_value() { - let req_header = RequestHeader::from_raw(&[ - 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, - ]) - .unwrap(); - - assert_eq!(req_header.request_id, (1u64 << 53u64) - 1u64); - assert_eq!(req_header.argument, 0xFFFFFFFFu32); - } - - #[test] - fn request_header_from_bytes_too_short() { - let req_header = - RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10]); - - assert_eq!(req_header, None); - } - - #[test] - fn request_header_from_bytes_long() { - let req_header = RequestHeader::from_raw(&[ - 8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 13, 14, 15, 16, 17, 18, 19, 20, 21, - ]) - .unwrap(); - assert_eq!(req_header.request_id, 0x0102030405060708u64); - assert_eq!(req_header.argument, 0x090A0B0Cu32); - } + let fut = op_fn(state.clone(), min_arg, bufs) + .map(move |result| serialize_bin_result(result, state)); + let temp = Box::pin(fut); + Op::Async(temp) + }, + ) } |