diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/Cargo.toml | 1 | ||||
-rw-r--r-- | core/examples/http_bench_json_ops.rs | 31 | ||||
-rw-r--r-- | core/io.rs | 271 | ||||
-rw-r--r-- | core/lib.rs | 4 | ||||
-rw-r--r-- | core/ops_builtin.rs | 74 | ||||
-rw-r--r-- | core/resources.rs | 191 |
6 files changed, 527 insertions, 45 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index 82ceab2a0..57dafa0e5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,6 +18,7 @@ v8_use_custom_libcxx = ["v8/use_custom_libcxx"] [dependencies] anyhow = "1.0.57" +bytes = "1" deno_ops = { path = "../ops", version = "0.32.0" } futures = "0.3.21" # Stay on 1.6 to avoid a dependency cycle in ahash https://github.com/tkaitchuck/aHash/issues/95 diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 7c895f326..6d61d35ec 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -10,7 +10,6 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; use std::cell::RefCell; use std::env; use std::net::SocketAddr; @@ -83,37 +82,23 @@ struct TcpStream { } impl TcpStream { - async fn read( - self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), Error> { + async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, Error> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - let nread = rd - .read(&mut buf) - .try_or_cancel(cancel) - .await - .map_err(Error::from)?; - Ok((nread, buf)) + let nread = rd.read(data).try_or_cancel(cancel).await?; + Ok(nread) } - async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> { + async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, Error> { let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; - wr.write(&buf).await.map_err(Error::from) + let nwritten = wr.write(data).await?; + Ok(nwritten) } } impl Resource for TcpStream { - fn read_return( - self: Rc<Self>, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { - Box::pin(self.write(buf)) - } + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); fn close(self: Rc<Self>) { self.cancel.cancel() diff --git a/core/io.rs b/core/io.rs new file mode 100644 index 000000000..7baad12e4 --- /dev/null +++ b/core/io.rs @@ -0,0 +1,271 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::ops::Deref; +use std::ops::DerefMut; + +use serde_v8::ZeroCopyBuf; + +/// BufView is a wrapper around an underlying contiguous chunk of bytes. It can +/// be created from a [ZeroCopyBuf], [bytes::Bytes], or [Vec<u8>] and implements +/// `Deref<[u8]>` and `AsRef<[u8]>`. +/// +/// The wrapper has the ability to constrain the exposed view to a sub-region of +/// the underlying buffer. This is useful for write operations, because they may +/// have to be called multiple times, with different views onto the buffer to be +/// able to write it entirely. +pub struct BufView { + inner: BufViewInner, + cursor: usize, +} + +enum BufViewInner { + Empty, + Bytes(bytes::Bytes), + ZeroCopy(ZeroCopyBuf), + Vec(Vec<u8>), +} + +impl BufView { + fn from_inner(inner: BufViewInner) -> Self { + Self { inner, cursor: 0 } + } + + pub fn empty() -> Self { + Self::from_inner(BufViewInner::Empty) + } + + /// Get the length of the buffer view. This is the length of the underlying + /// buffer minus the cursor position. + pub fn len(&self) -> usize { + match &self.inner { + BufViewInner::Empty => 0, + BufViewInner::Bytes(bytes) => bytes.len() - self.cursor, + BufViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor, + BufViewInner::Vec(vec) => vec.len() - self.cursor, + } + } + + /// Is the buffer view empty? + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Advance the internal cursor of the buffer view by `n` bytes. + pub fn advance_cursor(&mut self, n: usize) { + assert!(self.len() >= n); + self.cursor += n; + } + + /// Reset the internal cursor of the buffer view to the beginning of the + /// buffer. Returns the old cursor position. + pub fn reset_cursor(&mut self) -> usize { + let old = self.cursor; + self.cursor = 0; + old + } +} + +impl Deref for BufView { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let buf = match &self.inner { + BufViewInner::Empty => &[], + BufViewInner::Bytes(bytes) => bytes.deref(), + BufViewInner::ZeroCopy(zero_copy) => zero_copy.deref(), + BufViewInner::Vec(vec) => vec.deref(), + }; + &buf[self.cursor..] + } +} + +impl AsRef<[u8]> for BufView { + fn as_ref(&self) -> &[u8] { + self.deref() + } +} + +impl From<ZeroCopyBuf> for BufView { + fn from(buf: ZeroCopyBuf) -> Self { + Self::from_inner(BufViewInner::ZeroCopy(buf)) + } +} + +impl From<Vec<u8>> for BufView { + fn from(vec: Vec<u8>) -> Self { + Self::from_inner(BufViewInner::Vec(vec)) + } +} + +impl From<bytes::Bytes> for BufView { + fn from(buf: bytes::Bytes) -> Self { + Self::from_inner(BufViewInner::Bytes(buf)) + } +} + +impl From<BufView> for bytes::Bytes { + fn from(buf: BufView) -> Self { + match buf.inner { + BufViewInner::Empty => bytes::Bytes::new(), + BufViewInner::Bytes(bytes) => bytes, + BufViewInner::ZeroCopy(zero_copy) => zero_copy.into(), + BufViewInner::Vec(vec) => vec.into(), + } + } +} + +/// BufMutView is a wrapper around an underlying contiguous chunk of writable +/// bytes. It can be created from a `ZeroCopyBuf` or a `Vec<u8>` and implements +/// `DerefMut<[u8]>` and `AsMut<[u8]>`. +/// +/// The wrapper has the ability to constrain the exposed view to a sub-region of +/// the underlying buffer. This is useful for write operations, because they may +/// have to be called multiple times, with different views onto the buffer to be +/// able to write it entirely. +/// +/// A `BufMutView` can be turned into a `BufView` by calling `BufMutView::into_view`. +pub struct BufMutView { + inner: BufMutViewInner, + cursor: usize, +} + +enum BufMutViewInner { + ZeroCopy(ZeroCopyBuf), + Vec(Vec<u8>), +} + +impl BufMutView { + fn from_inner(inner: BufMutViewInner) -> Self { + Self { inner, cursor: 0 } + } + + pub fn new(len: usize) -> Self { + Self::from_inner(BufMutViewInner::Vec(vec![0; len])) + } + + /// Get the length of the buffer view. This is the length of the underlying + /// buffer minus the cursor position. + pub fn len(&self) -> usize { + match &self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor, + BufMutViewInner::Vec(vec) => vec.len() - self.cursor, + } + } + + /// Is the buffer view empty? + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Advance the internal cursor of the buffer view by `n` bytes. + pub fn advance_cursor(&mut self, n: usize) { + assert!(self.len() >= n); + self.cursor += n; + } + + /// Reset the internal cursor of the buffer view to the beginning of the + /// buffer. Returns the old cursor position. + pub fn reset_cursor(&mut self) -> usize { + let old = self.cursor; + self.cursor = 0; + old + } + + /// Turn this `BufMutView` into a `BufView`. + pub fn into_view(self) -> BufView { + let inner = match self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => BufViewInner::ZeroCopy(zero_copy), + BufMutViewInner::Vec(vec) => BufViewInner::Vec(vec), + }; + BufView { + inner, + cursor: self.cursor, + } + } + + /// Unwrap the underlying buffer into a `Vec<u8>`, consuming the `BufMutView`. + /// + /// This method panics when called on a `BufMutView` that was created from a + /// `ZeroCopyBuf`. + pub fn unwrap_vec(self) -> Vec<u8> { + match self.inner { + BufMutViewInner::ZeroCopy(_) => { + panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec"); + } + BufMutViewInner::Vec(vec) => vec, + } + } + + /// Get a mutable reference to an underlying `Vec<u8>`. + /// + /// This method panics when called on a `BufMutView` that was created from a + /// `ZeroCopyBuf`. + pub fn get_mut_vec(&mut self) -> &mut Vec<u8> { + match &mut self.inner { + BufMutViewInner::ZeroCopy(_) => { + panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec"); + } + BufMutViewInner::Vec(vec) => vec, + } + } +} + +impl Deref for BufMutView { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let buf = match &self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref(), + BufMutViewInner::Vec(vec) => vec.deref(), + }; + &buf[self.cursor..] + } +} + +impl DerefMut for BufMutView { + fn deref_mut(&mut self) -> &mut [u8] { + let buf = match &mut self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref_mut(), + BufMutViewInner::Vec(vec) => vec.deref_mut(), + }; + &mut buf[self.cursor..] + } +} + +impl AsRef<[u8]> for BufMutView { + fn as_ref(&self) -> &[u8] { + self.deref() + } +} + +impl AsMut<[u8]> for BufMutView { + fn as_mut(&mut self) -> &mut [u8] { + self.deref_mut() + } +} + +impl From<ZeroCopyBuf> for BufMutView { + fn from(buf: ZeroCopyBuf) -> Self { + Self::from_inner(BufMutViewInner::ZeroCopy(buf)) + } +} + +impl From<Vec<u8>> for BufMutView { + fn from(buf: Vec<u8>) -> Self { + Self::from_inner(BufMutViewInner::Vec(buf)) + } +} + +pub enum WriteOutcome { + Partial { nwritten: usize, view: BufView }, + Full { nwritten: usize }, +} + +impl WriteOutcome { + pub fn nwritten(&self) -> usize { + match self { + WriteOutcome::Partial { nwritten, .. } => *nwritten, + WriteOutcome::Full { nwritten } => *nwritten, + } + } +} diff --git a/core/lib.rs b/core/lib.rs index 8043b5e95..adda98046 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -8,6 +8,7 @@ mod extensions; mod flags; mod gotham_state; mod inspector; +mod io; mod module_specifier; mod modules; mod normalize_path; @@ -58,6 +59,9 @@ pub use crate::inspector::InspectorMsgKind; pub use crate::inspector::InspectorSessionProxy; pub use crate::inspector::JsRuntimeInspector; pub use crate::inspector::LocalInspectorSession; +pub use crate::io::BufMutView; +pub use crate::io::BufView; +pub use crate::io::WriteOutcome; pub use crate::module_specifier::resolve_import; pub use crate::module_specifier::resolve_path; pub use crate::module_specifier::resolve_url; diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 7393d4b69..41741bf28 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -2,6 +2,8 @@ use crate::error::format_file_name; use crate::error::type_error; use crate::include_js_files; +use crate::io::BufMutView; +use crate::io::BufView; use crate::ops_metrics::OpMetrics; use crate::resources::ResourceId; use crate::Extension; @@ -166,7 +168,8 @@ async fn op_read( buf: ZeroCopyBuf, ) -> Result<u32, Error> { let resource = state.borrow().resource_table.get_any(rid)?; - resource.read_return(buf).await.map(|(n, _)| n as u32) + let view = BufMutView::from(buf); + resource.read_byob(view).await.map(|(n, _)| n as u32) } #[op] @@ -175,18 +178,67 @@ async fn op_read_all( rid: ResourceId, ) -> Result<ZeroCopyBuf, Error> { let resource = state.borrow().resource_table.get_any(rid)?; - let (min, maximum) = resource.size_hint(); - let size = maximum.unwrap_or(min) as usize; - let mut buffer = Vec::with_capacity(size); + // The number of bytes we attempt to grow the buffer by each time it fills + // up and we have more data to read. We start at 64 KB. The grow_len is + // doubled if the nread returned from a single read is equal or greater than + // the grow_len. This allows us to reduce allocations for resources that can + // read large chunks of data at a time. + let mut grow_len: usize = 64 * 1024; + + let (min, maybe_max) = resource.size_hint(); + // Try to determine an optimial starting buffer size for this resource based + // on the size hint. + let initial_size = match (min, maybe_max) { + (min, Some(max)) if min == max => min as usize, + (_min, Some(max)) if (max as usize) < grow_len => max as usize, + (min, _) if (min as usize) < grow_len => grow_len, + (min, _) => min as usize, + }; + + let mut buf = BufMutView::new(initial_size); loop { - let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]); - let (nread, tmp) = resource.clone().read_return(tmp).await?; - if nread == 0 { - return Ok(buffer.into()); + // if the buffer does not have much remaining space, we may have to grow it. + if buf.len() < grow_len { + let vec = buf.get_mut_vec(); + match maybe_max { + Some(max) if vec.len() >= max as usize => { + // no need to resize the vec, because the vec is already large enough + // to accomodate the maximum size of the read data. + } + Some(max) if (max as usize) < vec.len() + grow_len => { + // grow the vec to the maximum size of the read data + vec.resize(max as usize, 0); + } + _ => { + // grow the vec by grow_len + vec.resize(vec.len() + grow_len, 0); + } + } + } + let (n, new_buf) = resource.clone().read_byob(buf).await?; + buf = new_buf; + buf.advance_cursor(n); + if n == 0 { + break; + } + if n >= grow_len { + // we managed to read more or equal data than fits in a single grow_len in + // a single go, so let's attempt to read even more next time. this reduces + // allocations for resources that can read large chunks of data at a time. + grow_len *= 2; } - buffer.extend_from_slice(&tmp[..nread]); } + + let nread = buf.reset_cursor(); + let mut vec = buf.unwrap_vec(); + // If the buffer is larger than the amount of data read, shrink it to the + // amount of data read. + if nread < vec.len() { + vec.truncate(nread); + } + + Ok(ZeroCopyBuf::from(vec)) } #[op] @@ -196,7 +248,9 @@ async fn op_write( buf: ZeroCopyBuf, ) -> Result<u32, Error> { let resource = state.borrow().resource_table.get_any(rid)?; - resource.write(buf).await.map(|n| n as u32) + let view = BufView::from(buf); + let resp = resource.write(view).await?; + Ok(resp.nwritten() as u32) } #[op] diff --git a/core/resources.rs b/core/resources.rs index 1a1ba3193..ee9ee689f 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -8,7 +8,9 @@ use crate::error::bad_resource_id; use crate::error::not_supported; -use crate::ZeroCopyBuf; +use crate::io::BufMutView; +use crate::io::BufView; +use crate::io::WriteOutcome; use anyhow::Error; use futures::Future; use std::any::type_name; @@ -23,9 +25,51 @@ use std::rc::Rc; /// Returned by resource read/write/shutdown methods pub type AsyncResult<T> = Pin<Box<dyn Future<Output = Result<T, Error>>>>; -/// All objects that can be store in the resource table should implement the -/// `Resource` trait. -/// TODO(@AaronO): investigate avoiding alloc on read/write/shutdown +/// Resources are Rust objects that are attached to a [deno_core::JsRuntime]. +/// They are identified in JS by a numeric ID (the resource ID, or rid). +/// Resources can be created in ops. Resources can also be retrieved in ops by +/// their rid. Resources are not thread-safe - they can only be accessed from +/// the thread that the JsRuntime lives on. +/// +/// Resources are reference counted in Rust. This means that they can be +/// cloned and passed around. When the last reference is dropped, the resource +/// is automatically closed. As long as the resource exists in the resource +/// table, the reference count is at least 1. +/// +/// ### Readable +/// +/// Readable resources are resources that can have data read from. Examples of +/// this are files, sockets, or HTTP streams. +/// +/// Readables can be read from from either JS or Rust. In JS one can use +/// `Deno.core.read()` to read from a single chunk of data from a readable. In +/// Rust one can directly call `read()` or `read_byob()`. The Rust side code is +/// used to implement ops like `op_slice`. +/// +/// A distinction can be made between readables that produce chunks of data +/// themselves (they allocate the chunks), and readables that fill up +/// bring-your-own-buffers (BYOBs). The former is often the case for framed +/// protocols like HTTP, while the latter is often the case for kernel backed +/// resources like files and sockets. +/// +/// All readables must implement `read()`. If resources can support an optimized +/// path for BYOBs, they should also implement `read_byob()`. For kernel backed +/// resources it often makes sense to implement `read_byob()` first, and then +/// implement `read()` as an operation that allocates a new chunk with +/// `len == limit`, then calls `read_byob()`, and then returns a chunk sliced to +/// the number of bytes read. Kernel backed resources can use the +/// [deno_core::impl_readable_byob] macro to implement optimized `read_byob()` +/// and `read()` implementations from a single `Self::read()` method. +/// +/// ### Writable +/// +/// Writable resources are resources that can have data written to. Examples of +/// this are files, sockets, or HTTP streams. +/// +/// Writables can be written to from either JS or Rust. In JS one can use +/// `Deno.core.write()` to write to a single chunk of data to a writable. In +/// Rust one can directly call `write()`. The latter is used to implement ops +/// like `op_slice`. pub trait Resource: Any + 'static { /// Returns a string representation of the resource which is made available /// to JavaScript code through `op_resources`. The default implementation @@ -35,20 +79,86 @@ pub trait Resource: Any + 'static { type_name::<Self>().into() } - /// Resources may implement `read_return()` to be a readable stream - fn read_return( - self: Rc<Self>, - _buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + /// Read a single chunk of data from the resource. This operation returns a + /// `BufView` that represents the data that was read. If a zero length buffer + /// is returned, it indicates that the resource has reached EOF. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. + /// + /// If a readable can provide an optimized path for BYOBs, it should also + /// implement `read_byob()`. + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { + _ = limit; Box::pin(futures::future::err(not_supported())) } - /// Resources may implement `write()` to be a writable stream - fn write(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> { + /// Read a single chunk of data from the resource into the provided `BufMutView`. + /// + /// This operation returns the number of bytes read. If zero bytes are read, + /// it indicates that the resource has reached EOF. + /// + /// If this method is not implemented explicitly, the default implementation + /// will call `read()` and then copy the data into the provided buffer. For + /// readable resources that can provide an optimized path for BYOBs, it is + /// strongly recommended to override this method. + fn read_byob( + self: Rc<Self>, + mut buf: BufMutView, + ) -> AsyncResult<(usize, BufMutView)> { + Box::pin(async move { + let read = self.read(buf.len()).await?; + let nread = read.len(); + buf[..nread].copy_from_slice(&read); + Ok((nread, buf)) + }) + } + + /// Write a single chunk of data to the resource. The operation may not be + /// able to write the entire chunk, in which case it should return the number + /// of bytes written. Additionally it should return the `BufView` that was + /// passed in. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. + fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> { + _ = buf; Box::pin(futures::future::err(not_supported())) } - /// Resources may implement `shutdown()` for graceful async shutdowns + /// Write an entire chunk of data to the resource. Unlike `write()`, this will + /// ensure the entire chunk is written. If the operation is not able to write + /// the entire chunk, an error is to be returned. + /// + /// By default this method will call `write()` repeatedly until the entire + /// chunk is written. Resources that can write the entire chunk in a single + /// operation using an optimized path should override this method. + fn write_all(self: Rc<Self>, view: BufView) -> AsyncResult<()> { + Box::pin(async move { + let mut view = view; + let this = self; + while !view.is_empty() { + let resp = this.clone().write(view).await?; + match resp { + WriteOutcome::Partial { + nwritten, + view: new_view, + } => { + view = new_view; + view.advance_cursor(nwritten); + } + WriteOutcome::Full { .. } => break, + } + } + Ok(()) + }) + } + + /// The shutdown method can be used to asynchronously close the resource. It + /// is not automatically called when the resource is dropped or closed. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. fn shutdown(self: Rc<Self>) -> AsyncResult<()> { Box::pin(futures::future::err(not_supported())) } @@ -229,3 +339,60 @@ impl ResourceTable { .map(|(&id, resource)| (id, resource.name())) } } + +#[macro_export] +macro_rules! impl_readable_byob { + () => { + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<$crate::BufView> { + Box::pin(async move { + let mut vec = vec![0; limit]; + let nread = self.read(&mut vec).await?; + if nread != vec.len() { + vec.truncate(nread); + } + let view = $crate::BufView::from(vec); + Ok(view) + }) + } + + fn read_byob( + self: Rc<Self>, + mut buf: $crate::BufMutView, + ) -> AsyncResult<(usize, $crate::BufMutView)> { + Box::pin(async move { + let nread = self.read(buf.as_mut()).await?; + Ok((nread, buf)) + }) + } + }; +} + +#[macro_export] +macro_rules! impl_writable { + (__write) => { + fn write( + self: Rc<Self>, + view: $crate::BufView, + ) -> AsyncResult<$crate::WriteOutcome> { + Box::pin(async move { + let nwritten = self.write(&view).await?; + Ok($crate::WriteOutcome::Partial { nwritten, view }) + }) + } + }; + (__write_all) => { + fn write_all(self: Rc<Self>, view: $crate::BufView) -> AsyncResult<()> { + Box::pin(async move { + self.write_all(&view).await?; + Ok(()) + }) + } + }; + () => { + $crate::impl_writable!(__write); + }; + (with_all) => { + $crate::impl_writable!(__write); + $crate::impl_writable!(__write_all); + }; +} |