summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/Cargo.toml1
-rw-r--r--core/examples/http_bench_json_ops.rs31
-rw-r--r--core/io.rs271
-rw-r--r--core/lib.rs4
-rw-r--r--core/ops_builtin.rs74
-rw-r--r--core/resources.rs191
6 files changed, 527 insertions, 45 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 82ceab2a0..57dafa0e5 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -18,6 +18,7 @@ v8_use_custom_libcxx = ["v8/use_custom_libcxx"]
[dependencies]
anyhow = "1.0.57"
+bytes = "1"
deno_ops = { path = "../ops", version = "0.32.0" }
futures = "0.3.21"
# Stay on 1.6 to avoid a dependency cycle in ahash https://github.com/tkaitchuck/aHash/issues/95
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index 7c895f326..6d61d35ec 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -10,7 +10,6 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
use std::cell::RefCell;
use std::env;
use std::net::SocketAddr;
@@ -83,37 +82,23 @@ struct TcpStream {
}
impl TcpStream {
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), Error> {
+ async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, Error> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = rd
- .read(&mut buf)
- .try_or_cancel(cancel)
- .await
- .map_err(Error::from)?;
- Ok((nread, buf))
+ let nread = rd.read(data).try_or_cancel(cancel).await?;
+ Ok(nread)
}
- async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> {
+ async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, Error> {
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
- wr.write(&buf).await.map_err(Error::from)
+ let nwritten = wr.write(data).await?;
+ Ok(nwritten)
}
}
impl Resource for TcpStream {
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
fn close(self: Rc<Self>) {
self.cancel.cancel()
diff --git a/core/io.rs b/core/io.rs
new file mode 100644
index 000000000..7baad12e4
--- /dev/null
+++ b/core/io.rs
@@ -0,0 +1,271 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::ops::Deref;
+use std::ops::DerefMut;
+
+use serde_v8::ZeroCopyBuf;
+
+/// BufView is a wrapper around an underlying contiguous chunk of bytes. It can
+/// be created from a [ZeroCopyBuf], [bytes::Bytes], or [Vec<u8>] and implements
+/// `Deref<[u8]>` and `AsRef<[u8]>`.
+///
+/// The wrapper has the ability to constrain the exposed view to a sub-region of
+/// the underlying buffer. This is useful for write operations, because they may
+/// have to be called multiple times, with different views onto the buffer to be
+/// able to write it entirely.
+pub struct BufView {
+ inner: BufViewInner,
+ cursor: usize,
+}
+
+enum BufViewInner {
+ Empty,
+ Bytes(bytes::Bytes),
+ ZeroCopy(ZeroCopyBuf),
+ Vec(Vec<u8>),
+}
+
+impl BufView {
+ fn from_inner(inner: BufViewInner) -> Self {
+ Self { inner, cursor: 0 }
+ }
+
+ pub fn empty() -> Self {
+ Self::from_inner(BufViewInner::Empty)
+ }
+
+ /// Get the length of the buffer view. This is the length of the underlying
+ /// buffer minus the cursor position.
+ pub fn len(&self) -> usize {
+ match &self.inner {
+ BufViewInner::Empty => 0,
+ BufViewInner::Bytes(bytes) => bytes.len() - self.cursor,
+ BufViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor,
+ BufViewInner::Vec(vec) => vec.len() - self.cursor,
+ }
+ }
+
+ /// Is the buffer view empty?
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Advance the internal cursor of the buffer view by `n` bytes.
+ pub fn advance_cursor(&mut self, n: usize) {
+ assert!(self.len() >= n);
+ self.cursor += n;
+ }
+
+ /// Reset the internal cursor of the buffer view to the beginning of the
+ /// buffer. Returns the old cursor position.
+ pub fn reset_cursor(&mut self) -> usize {
+ let old = self.cursor;
+ self.cursor = 0;
+ old
+ }
+}
+
+impl Deref for BufView {
+ type Target = [u8];
+
+ fn deref(&self) -> &[u8] {
+ let buf = match &self.inner {
+ BufViewInner::Empty => &[],
+ BufViewInner::Bytes(bytes) => bytes.deref(),
+ BufViewInner::ZeroCopy(zero_copy) => zero_copy.deref(),
+ BufViewInner::Vec(vec) => vec.deref(),
+ };
+ &buf[self.cursor..]
+ }
+}
+
+impl AsRef<[u8]> for BufView {
+ fn as_ref(&self) -> &[u8] {
+ self.deref()
+ }
+}
+
+impl From<ZeroCopyBuf> for BufView {
+ fn from(buf: ZeroCopyBuf) -> Self {
+ Self::from_inner(BufViewInner::ZeroCopy(buf))
+ }
+}
+
+impl From<Vec<u8>> for BufView {
+ fn from(vec: Vec<u8>) -> Self {
+ Self::from_inner(BufViewInner::Vec(vec))
+ }
+}
+
+impl From<bytes::Bytes> for BufView {
+ fn from(buf: bytes::Bytes) -> Self {
+ Self::from_inner(BufViewInner::Bytes(buf))
+ }
+}
+
+impl From<BufView> for bytes::Bytes {
+ fn from(buf: BufView) -> Self {
+ match buf.inner {
+ BufViewInner::Empty => bytes::Bytes::new(),
+ BufViewInner::Bytes(bytes) => bytes,
+ BufViewInner::ZeroCopy(zero_copy) => zero_copy.into(),
+ BufViewInner::Vec(vec) => vec.into(),
+ }
+ }
+}
+
+/// BufMutView is a wrapper around an underlying contiguous chunk of writable
+/// bytes. It can be created from a `ZeroCopyBuf` or a `Vec<u8>` and implements
+/// `DerefMut<[u8]>` and `AsMut<[u8]>`.
+///
+/// The wrapper has the ability to constrain the exposed view to a sub-region of
+/// the underlying buffer. This is useful for write operations, because they may
+/// have to be called multiple times, with different views onto the buffer to be
+/// able to write it entirely.
+///
+/// A `BufMutView` can be turned into a `BufView` by calling `BufMutView::into_view`.
+pub struct BufMutView {
+ inner: BufMutViewInner,
+ cursor: usize,
+}
+
+enum BufMutViewInner {
+ ZeroCopy(ZeroCopyBuf),
+ Vec(Vec<u8>),
+}
+
+impl BufMutView {
+ fn from_inner(inner: BufMutViewInner) -> Self {
+ Self { inner, cursor: 0 }
+ }
+
+ pub fn new(len: usize) -> Self {
+ Self::from_inner(BufMutViewInner::Vec(vec![0; len]))
+ }
+
+ /// Get the length of the buffer view. This is the length of the underlying
+ /// buffer minus the cursor position.
+ pub fn len(&self) -> usize {
+ match &self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor,
+ BufMutViewInner::Vec(vec) => vec.len() - self.cursor,
+ }
+ }
+
+ /// Is the buffer view empty?
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Advance the internal cursor of the buffer view by `n` bytes.
+ pub fn advance_cursor(&mut self, n: usize) {
+ assert!(self.len() >= n);
+ self.cursor += n;
+ }
+
+ /// Reset the internal cursor of the buffer view to the beginning of the
+ /// buffer. Returns the old cursor position.
+ pub fn reset_cursor(&mut self) -> usize {
+ let old = self.cursor;
+ self.cursor = 0;
+ old
+ }
+
+ /// Turn this `BufMutView` into a `BufView`.
+ pub fn into_view(self) -> BufView {
+ let inner = match self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => BufViewInner::ZeroCopy(zero_copy),
+ BufMutViewInner::Vec(vec) => BufViewInner::Vec(vec),
+ };
+ BufView {
+ inner,
+ cursor: self.cursor,
+ }
+ }
+
+ /// Unwrap the underlying buffer into a `Vec<u8>`, consuming the `BufMutView`.
+ ///
+ /// This method panics when called on a `BufMutView` that was created from a
+ /// `ZeroCopyBuf`.
+ pub fn unwrap_vec(self) -> Vec<u8> {
+ match self.inner {
+ BufMutViewInner::ZeroCopy(_) => {
+ panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec");
+ }
+ BufMutViewInner::Vec(vec) => vec,
+ }
+ }
+
+ /// Get a mutable reference to an underlying `Vec<u8>`.
+ ///
+ /// This method panics when called on a `BufMutView` that was created from a
+ /// `ZeroCopyBuf`.
+ pub fn get_mut_vec(&mut self) -> &mut Vec<u8> {
+ match &mut self.inner {
+ BufMutViewInner::ZeroCopy(_) => {
+ panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec");
+ }
+ BufMutViewInner::Vec(vec) => vec,
+ }
+ }
+}
+
+impl Deref for BufMutView {
+ type Target = [u8];
+
+ fn deref(&self) -> &[u8] {
+ let buf = match &self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref(),
+ BufMutViewInner::Vec(vec) => vec.deref(),
+ };
+ &buf[self.cursor..]
+ }
+}
+
+impl DerefMut for BufMutView {
+ fn deref_mut(&mut self) -> &mut [u8] {
+ let buf = match &mut self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref_mut(),
+ BufMutViewInner::Vec(vec) => vec.deref_mut(),
+ };
+ &mut buf[self.cursor..]
+ }
+}
+
+impl AsRef<[u8]> for BufMutView {
+ fn as_ref(&self) -> &[u8] {
+ self.deref()
+ }
+}
+
+impl AsMut<[u8]> for BufMutView {
+ fn as_mut(&mut self) -> &mut [u8] {
+ self.deref_mut()
+ }
+}
+
+impl From<ZeroCopyBuf> for BufMutView {
+ fn from(buf: ZeroCopyBuf) -> Self {
+ Self::from_inner(BufMutViewInner::ZeroCopy(buf))
+ }
+}
+
+impl From<Vec<u8>> for BufMutView {
+ fn from(buf: Vec<u8>) -> Self {
+ Self::from_inner(BufMutViewInner::Vec(buf))
+ }
+}
+
+pub enum WriteOutcome {
+ Partial { nwritten: usize, view: BufView },
+ Full { nwritten: usize },
+}
+
+impl WriteOutcome {
+ pub fn nwritten(&self) -> usize {
+ match self {
+ WriteOutcome::Partial { nwritten, .. } => *nwritten,
+ WriteOutcome::Full { nwritten } => *nwritten,
+ }
+ }
+}
diff --git a/core/lib.rs b/core/lib.rs
index 8043b5e95..adda98046 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -8,6 +8,7 @@ mod extensions;
mod flags;
mod gotham_state;
mod inspector;
+mod io;
mod module_specifier;
mod modules;
mod normalize_path;
@@ -58,6 +59,9 @@ pub use crate::inspector::InspectorMsgKind;
pub use crate::inspector::InspectorSessionProxy;
pub use crate::inspector::JsRuntimeInspector;
pub use crate::inspector::LocalInspectorSession;
+pub use crate::io::BufMutView;
+pub use crate::io::BufView;
+pub use crate::io::WriteOutcome;
pub use crate::module_specifier::resolve_import;
pub use crate::module_specifier::resolve_path;
pub use crate::module_specifier::resolve_url;
diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs
index 7393d4b69..41741bf28 100644
--- a/core/ops_builtin.rs
+++ b/core/ops_builtin.rs
@@ -2,6 +2,8 @@
use crate::error::format_file_name;
use crate::error::type_error;
use crate::include_js_files;
+use crate::io::BufMutView;
+use crate::io::BufView;
use crate::ops_metrics::OpMetrics;
use crate::resources::ResourceId;
use crate::Extension;
@@ -166,7 +168,8 @@ async fn op_read(
buf: ZeroCopyBuf,
) -> Result<u32, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
- resource.read_return(buf).await.map(|(n, _)| n as u32)
+ let view = BufMutView::from(buf);
+ resource.read_byob(view).await.map(|(n, _)| n as u32)
}
#[op]
@@ -175,18 +178,67 @@ async fn op_read_all(
rid: ResourceId,
) -> Result<ZeroCopyBuf, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
- let (min, maximum) = resource.size_hint();
- let size = maximum.unwrap_or(min) as usize;
- let mut buffer = Vec::with_capacity(size);
+ // The number of bytes we attempt to grow the buffer by each time it fills
+ // up and we have more data to read. We start at 64 KB. The grow_len is
+ // doubled if the nread returned from a single read is equal or greater than
+ // the grow_len. This allows us to reduce allocations for resources that can
+ // read large chunks of data at a time.
+ let mut grow_len: usize = 64 * 1024;
+
+ let (min, maybe_max) = resource.size_hint();
+ // Try to determine an optimial starting buffer size for this resource based
+ // on the size hint.
+ let initial_size = match (min, maybe_max) {
+ (min, Some(max)) if min == max => min as usize,
+ (_min, Some(max)) if (max as usize) < grow_len => max as usize,
+ (min, _) if (min as usize) < grow_len => grow_len,
+ (min, _) => min as usize,
+ };
+
+ let mut buf = BufMutView::new(initial_size);
loop {
- let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]);
- let (nread, tmp) = resource.clone().read_return(tmp).await?;
- if nread == 0 {
- return Ok(buffer.into());
+ // if the buffer does not have much remaining space, we may have to grow it.
+ if buf.len() < grow_len {
+ let vec = buf.get_mut_vec();
+ match maybe_max {
+ Some(max) if vec.len() >= max as usize => {
+ // no need to resize the vec, because the vec is already large enough
+ // to accomodate the maximum size of the read data.
+ }
+ Some(max) if (max as usize) < vec.len() + grow_len => {
+ // grow the vec to the maximum size of the read data
+ vec.resize(max as usize, 0);
+ }
+ _ => {
+ // grow the vec by grow_len
+ vec.resize(vec.len() + grow_len, 0);
+ }
+ }
+ }
+ let (n, new_buf) = resource.clone().read_byob(buf).await?;
+ buf = new_buf;
+ buf.advance_cursor(n);
+ if n == 0 {
+ break;
+ }
+ if n >= grow_len {
+ // we managed to read more or equal data than fits in a single grow_len in
+ // a single go, so let's attempt to read even more next time. this reduces
+ // allocations for resources that can read large chunks of data at a time.
+ grow_len *= 2;
}
- buffer.extend_from_slice(&tmp[..nread]);
}
+
+ let nread = buf.reset_cursor();
+ let mut vec = buf.unwrap_vec();
+ // If the buffer is larger than the amount of data read, shrink it to the
+ // amount of data read.
+ if nread < vec.len() {
+ vec.truncate(nread);
+ }
+
+ Ok(ZeroCopyBuf::from(vec))
}
#[op]
@@ -196,7 +248,9 @@ async fn op_write(
buf: ZeroCopyBuf,
) -> Result<u32, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
- resource.write(buf).await.map(|n| n as u32)
+ let view = BufView::from(buf);
+ let resp = resource.write(view).await?;
+ Ok(resp.nwritten() as u32)
}
#[op]
diff --git a/core/resources.rs b/core/resources.rs
index 1a1ba3193..ee9ee689f 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -8,7 +8,9 @@
use crate::error::bad_resource_id;
use crate::error::not_supported;
-use crate::ZeroCopyBuf;
+use crate::io::BufMutView;
+use crate::io::BufView;
+use crate::io::WriteOutcome;
use anyhow::Error;
use futures::Future;
use std::any::type_name;
@@ -23,9 +25,51 @@ use std::rc::Rc;
/// Returned by resource read/write/shutdown methods
pub type AsyncResult<T> = Pin<Box<dyn Future<Output = Result<T, Error>>>>;
-/// All objects that can be store in the resource table should implement the
-/// `Resource` trait.
-/// TODO(@AaronO): investigate avoiding alloc on read/write/shutdown
+/// Resources are Rust objects that are attached to a [deno_core::JsRuntime].
+/// They are identified in JS by a numeric ID (the resource ID, or rid).
+/// Resources can be created in ops. Resources can also be retrieved in ops by
+/// their rid. Resources are not thread-safe - they can only be accessed from
+/// the thread that the JsRuntime lives on.
+///
+/// Resources are reference counted in Rust. This means that they can be
+/// cloned and passed around. When the last reference is dropped, the resource
+/// is automatically closed. As long as the resource exists in the resource
+/// table, the reference count is at least 1.
+///
+/// ### Readable
+///
+/// Readable resources are resources that can have data read from. Examples of
+/// this are files, sockets, or HTTP streams.
+///
+/// Readables can be read from from either JS or Rust. In JS one can use
+/// `Deno.core.read()` to read from a single chunk of data from a readable. In
+/// Rust one can directly call `read()` or `read_byob()`. The Rust side code is
+/// used to implement ops like `op_slice`.
+///
+/// A distinction can be made between readables that produce chunks of data
+/// themselves (they allocate the chunks), and readables that fill up
+/// bring-your-own-buffers (BYOBs). The former is often the case for framed
+/// protocols like HTTP, while the latter is often the case for kernel backed
+/// resources like files and sockets.
+///
+/// All readables must implement `read()`. If resources can support an optimized
+/// path for BYOBs, they should also implement `read_byob()`. For kernel backed
+/// resources it often makes sense to implement `read_byob()` first, and then
+/// implement `read()` as an operation that allocates a new chunk with
+/// `len == limit`, then calls `read_byob()`, and then returns a chunk sliced to
+/// the number of bytes read. Kernel backed resources can use the
+/// [deno_core::impl_readable_byob] macro to implement optimized `read_byob()`
+/// and `read()` implementations from a single `Self::read()` method.
+///
+/// ### Writable
+///
+/// Writable resources are resources that can have data written to. Examples of
+/// this are files, sockets, or HTTP streams.
+///
+/// Writables can be written to from either JS or Rust. In JS one can use
+/// `Deno.core.write()` to write to a single chunk of data to a writable. In
+/// Rust one can directly call `write()`. The latter is used to implement ops
+/// like `op_slice`.
pub trait Resource: Any + 'static {
/// Returns a string representation of the resource which is made available
/// to JavaScript code through `op_resources`. The default implementation
@@ -35,20 +79,86 @@ pub trait Resource: Any + 'static {
type_name::<Self>().into()
}
- /// Resources may implement `read_return()` to be a readable stream
- fn read_return(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ /// Read a single chunk of data from the resource. This operation returns a
+ /// `BufView` that represents the data that was read. If a zero length buffer
+ /// is returned, it indicates that the resource has reached EOF.
+ ///
+ /// If this method is not implemented, the default implementation will error
+ /// with a "not supported" error.
+ ///
+ /// If a readable can provide an optimized path for BYOBs, it should also
+ /// implement `read_byob()`.
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ _ = limit;
Box::pin(futures::future::err(not_supported()))
}
- /// Resources may implement `write()` to be a writable stream
- fn write(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ /// Read a single chunk of data from the resource into the provided `BufMutView`.
+ ///
+ /// This operation returns the number of bytes read. If zero bytes are read,
+ /// it indicates that the resource has reached EOF.
+ ///
+ /// If this method is not implemented explicitly, the default implementation
+ /// will call `read()` and then copy the data into the provided buffer. For
+ /// readable resources that can provide an optimized path for BYOBs, it is
+ /// strongly recommended to override this method.
+ fn read_byob(
+ self: Rc<Self>,
+ mut buf: BufMutView,
+ ) -> AsyncResult<(usize, BufMutView)> {
+ Box::pin(async move {
+ let read = self.read(buf.len()).await?;
+ let nread = read.len();
+ buf[..nread].copy_from_slice(&read);
+ Ok((nread, buf))
+ })
+ }
+
+ /// Write a single chunk of data to the resource. The operation may not be
+ /// able to write the entire chunk, in which case it should return the number
+ /// of bytes written. Additionally it should return the `BufView` that was
+ /// passed in.
+ ///
+ /// If this method is not implemented, the default implementation will error
+ /// with a "not supported" error.
+ fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
+ _ = buf;
Box::pin(futures::future::err(not_supported()))
}
- /// Resources may implement `shutdown()` for graceful async shutdowns
+ /// Write an entire chunk of data to the resource. Unlike `write()`, this will
+ /// ensure the entire chunk is written. If the operation is not able to write
+ /// the entire chunk, an error is to be returned.
+ ///
+ /// By default this method will call `write()` repeatedly until the entire
+ /// chunk is written. Resources that can write the entire chunk in a single
+ /// operation using an optimized path should override this method.
+ fn write_all(self: Rc<Self>, view: BufView) -> AsyncResult<()> {
+ Box::pin(async move {
+ let mut view = view;
+ let this = self;
+ while !view.is_empty() {
+ let resp = this.clone().write(view).await?;
+ match resp {
+ WriteOutcome::Partial {
+ nwritten,
+ view: new_view,
+ } => {
+ view = new_view;
+ view.advance_cursor(nwritten);
+ }
+ WriteOutcome::Full { .. } => break,
+ }
+ }
+ Ok(())
+ })
+ }
+
+ /// The shutdown method can be used to asynchronously close the resource. It
+ /// is not automatically called when the resource is dropped or closed.
+ ///
+ /// If this method is not implemented, the default implementation will error
+ /// with a "not supported" error.
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(futures::future::err(not_supported()))
}
@@ -229,3 +339,60 @@ impl ResourceTable {
.map(|(&id, resource)| (id, resource.name()))
}
}
+
+#[macro_export]
+macro_rules! impl_readable_byob {
+ () => {
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<$crate::BufView> {
+ Box::pin(async move {
+ let mut vec = vec![0; limit];
+ let nread = self.read(&mut vec).await?;
+ if nread != vec.len() {
+ vec.truncate(nread);
+ }
+ let view = $crate::BufView::from(vec);
+ Ok(view)
+ })
+ }
+
+ fn read_byob(
+ self: Rc<Self>,
+ mut buf: $crate::BufMutView,
+ ) -> AsyncResult<(usize, $crate::BufMutView)> {
+ Box::pin(async move {
+ let nread = self.read(buf.as_mut()).await?;
+ Ok((nread, buf))
+ })
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! impl_writable {
+ (__write) => {
+ fn write(
+ self: Rc<Self>,
+ view: $crate::BufView,
+ ) -> AsyncResult<$crate::WriteOutcome> {
+ Box::pin(async move {
+ let nwritten = self.write(&view).await?;
+ Ok($crate::WriteOutcome::Partial { nwritten, view })
+ })
+ }
+ };
+ (__write_all) => {
+ fn write_all(self: Rc<Self>, view: $crate::BufView) -> AsyncResult<()> {
+ Box::pin(async move {
+ self.write_all(&view).await?;
+ Ok(())
+ })
+ }
+ };
+ () => {
+ $crate::impl_writable!(__write);
+ };
+ (with_all) => {
+ $crate::impl_writable!(__write);
+ $crate::impl_writable!(__write_all);
+ };
+}