summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--cli/tests/unit/fetch_test.ts4
-rw-r--r--core/Cargo.toml1
-rw-r--r--core/examples/http_bench_json_ops.rs31
-rw-r--r--core/io.rs271
-rw-r--r--core/lib.rs4
-rw-r--r--core/ops_builtin.rs74
-rw-r--r--core/resources.rs191
-rw-r--r--ext/cache/sqlite.rs27
-rw-r--r--ext/fetch/lib.rs63
-rw-r--r--ext/flash/lib.rs12
-rw-r--r--ext/http/lib.rs110
-rw-r--r--ext/net/io.rs57
-rw-r--r--ext/net/ops_tls.rs30
-rw-r--r--runtime/ops/io.rs92
15 files changed, 708 insertions, 260 deletions
diff --git a/Cargo.lock b/Cargo.lock
index aaff2475f..68212f05b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -967,6 +967,7 @@ name = "deno_core"
version = "0.154.0"
dependencies = [
"anyhow",
+ "bytes",
"deno_ast",
"deno_ops",
"futures",
diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts
index e2ff0d5e0..a668bb480 100644
--- a/cli/tests/unit/fetch_test.ts
+++ b/cli/tests/unit/fetch_test.ts
@@ -869,13 +869,13 @@ Deno.test(
);
Deno.test(function responseRedirect() {
- const redir = Response.redirect("example.com/newLocation", 301);
+ const redir = Response.redirect("http://example.com/newLocation", 301);
assertEquals(redir.status, 301);
assertEquals(redir.statusText, "");
assertEquals(redir.url, "");
assertEquals(
redir.headers.get("Location"),
- "http://js-unit-tests/foo/example.com/newLocation",
+ "http://example.com/newLocation",
);
assertEquals(redir.type, "default");
});
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 82ceab2a0..57dafa0e5 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -18,6 +18,7 @@ v8_use_custom_libcxx = ["v8/use_custom_libcxx"]
[dependencies]
anyhow = "1.0.57"
+bytes = "1"
deno_ops = { path = "../ops", version = "0.32.0" }
futures = "0.3.21"
# Stay on 1.6 to avoid a dependency cycle in ahash https://github.com/tkaitchuck/aHash/issues/95
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index 7c895f326..6d61d35ec 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -10,7 +10,6 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
use std::cell::RefCell;
use std::env;
use std::net::SocketAddr;
@@ -83,37 +82,23 @@ struct TcpStream {
}
impl TcpStream {
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), Error> {
+ async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, Error> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = rd
- .read(&mut buf)
- .try_or_cancel(cancel)
- .await
- .map_err(Error::from)?;
- Ok((nread, buf))
+ let nread = rd.read(data).try_or_cancel(cancel).await?;
+ Ok(nread)
}
- async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> {
+ async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, Error> {
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
- wr.write(&buf).await.map_err(Error::from)
+ let nwritten = wr.write(data).await?;
+ Ok(nwritten)
}
}
impl Resource for TcpStream {
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
fn close(self: Rc<Self>) {
self.cancel.cancel()
diff --git a/core/io.rs b/core/io.rs
new file mode 100644
index 000000000..7baad12e4
--- /dev/null
+++ b/core/io.rs
@@ -0,0 +1,271 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::ops::Deref;
+use std::ops::DerefMut;
+
+use serde_v8::ZeroCopyBuf;
+
+/// BufView is a wrapper around an underlying contiguous chunk of bytes. It can
+/// be created from a [ZeroCopyBuf], [bytes::Bytes], or [Vec<u8>] and implements
+/// `Deref<[u8]>` and `AsRef<[u8]>`.
+///
+/// The wrapper has the ability to constrain the exposed view to a sub-region of
+/// the underlying buffer. This is useful for write operations, because they may
+/// have to be called multiple times, with different views onto the buffer to be
+/// able to write it entirely.
+pub struct BufView {
+ inner: BufViewInner,
+ cursor: usize,
+}
+
+enum BufViewInner {
+ Empty,
+ Bytes(bytes::Bytes),
+ ZeroCopy(ZeroCopyBuf),
+ Vec(Vec<u8>),
+}
+
+impl BufView {
+ fn from_inner(inner: BufViewInner) -> Self {
+ Self { inner, cursor: 0 }
+ }
+
+ pub fn empty() -> Self {
+ Self::from_inner(BufViewInner::Empty)
+ }
+
+ /// Get the length of the buffer view. This is the length of the underlying
+ /// buffer minus the cursor position.
+ pub fn len(&self) -> usize {
+ match &self.inner {
+ BufViewInner::Empty => 0,
+ BufViewInner::Bytes(bytes) => bytes.len() - self.cursor,
+ BufViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor,
+ BufViewInner::Vec(vec) => vec.len() - self.cursor,
+ }
+ }
+
+ /// Is the buffer view empty?
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Advance the internal cursor of the buffer view by `n` bytes.
+ pub fn advance_cursor(&mut self, n: usize) {
+ assert!(self.len() >= n);
+ self.cursor += n;
+ }
+
+ /// Reset the internal cursor of the buffer view to the beginning of the
+ /// buffer. Returns the old cursor position.
+ pub fn reset_cursor(&mut self) -> usize {
+ let old = self.cursor;
+ self.cursor = 0;
+ old
+ }
+}
+
+impl Deref for BufView {
+ type Target = [u8];
+
+ fn deref(&self) -> &[u8] {
+ let buf = match &self.inner {
+ BufViewInner::Empty => &[],
+ BufViewInner::Bytes(bytes) => bytes.deref(),
+ BufViewInner::ZeroCopy(zero_copy) => zero_copy.deref(),
+ BufViewInner::Vec(vec) => vec.deref(),
+ };
+ &buf[self.cursor..]
+ }
+}
+
+impl AsRef<[u8]> for BufView {
+ fn as_ref(&self) -> &[u8] {
+ self.deref()
+ }
+}
+
+impl From<ZeroCopyBuf> for BufView {
+ fn from(buf: ZeroCopyBuf) -> Self {
+ Self::from_inner(BufViewInner::ZeroCopy(buf))
+ }
+}
+
+impl From<Vec<u8>> for BufView {
+ fn from(vec: Vec<u8>) -> Self {
+ Self::from_inner(BufViewInner::Vec(vec))
+ }
+}
+
+impl From<bytes::Bytes> for BufView {
+ fn from(buf: bytes::Bytes) -> Self {
+ Self::from_inner(BufViewInner::Bytes(buf))
+ }
+}
+
+impl From<BufView> for bytes::Bytes {
+ fn from(buf: BufView) -> Self {
+ match buf.inner {
+ BufViewInner::Empty => bytes::Bytes::new(),
+ BufViewInner::Bytes(bytes) => bytes,
+ BufViewInner::ZeroCopy(zero_copy) => zero_copy.into(),
+ BufViewInner::Vec(vec) => vec.into(),
+ }
+ }
+}
+
+/// BufMutView is a wrapper around an underlying contiguous chunk of writable
+/// bytes. It can be created from a `ZeroCopyBuf` or a `Vec<u8>` and implements
+/// `DerefMut<[u8]>` and `AsMut<[u8]>`.
+///
+/// The wrapper has the ability to constrain the exposed view to a sub-region of
+/// the underlying buffer. This is useful for write operations, because they may
+/// have to be called multiple times, with different views onto the buffer to be
+/// able to write it entirely.
+///
+/// A `BufMutView` can be turned into a `BufView` by calling `BufMutView::into_view`.
+pub struct BufMutView {
+ inner: BufMutViewInner,
+ cursor: usize,
+}
+
+enum BufMutViewInner {
+ ZeroCopy(ZeroCopyBuf),
+ Vec(Vec<u8>),
+}
+
+impl BufMutView {
+ fn from_inner(inner: BufMutViewInner) -> Self {
+ Self { inner, cursor: 0 }
+ }
+
+ pub fn new(len: usize) -> Self {
+ Self::from_inner(BufMutViewInner::Vec(vec![0; len]))
+ }
+
+ /// Get the length of the buffer view. This is the length of the underlying
+ /// buffer minus the cursor position.
+ pub fn len(&self) -> usize {
+ match &self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor,
+ BufMutViewInner::Vec(vec) => vec.len() - self.cursor,
+ }
+ }
+
+ /// Is the buffer view empty?
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Advance the internal cursor of the buffer view by `n` bytes.
+ pub fn advance_cursor(&mut self, n: usize) {
+ assert!(self.len() >= n);
+ self.cursor += n;
+ }
+
+ /// Reset the internal cursor of the buffer view to the beginning of the
+ /// buffer. Returns the old cursor position.
+ pub fn reset_cursor(&mut self) -> usize {
+ let old = self.cursor;
+ self.cursor = 0;
+ old
+ }
+
+ /// Turn this `BufMutView` into a `BufView`.
+ pub fn into_view(self) -> BufView {
+ let inner = match self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => BufViewInner::ZeroCopy(zero_copy),
+ BufMutViewInner::Vec(vec) => BufViewInner::Vec(vec),
+ };
+ BufView {
+ inner,
+ cursor: self.cursor,
+ }
+ }
+
+ /// Unwrap the underlying buffer into a `Vec<u8>`, consuming the `BufMutView`.
+ ///
+ /// This method panics when called on a `BufMutView` that was created from a
+ /// `ZeroCopyBuf`.
+ pub fn unwrap_vec(self) -> Vec<u8> {
+ match self.inner {
+ BufMutViewInner::ZeroCopy(_) => {
+ panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec");
+ }
+ BufMutViewInner::Vec(vec) => vec,
+ }
+ }
+
+ /// Get a mutable reference to an underlying `Vec<u8>`.
+ ///
+ /// This method panics when called on a `BufMutView` that was created from a
+ /// `ZeroCopyBuf`.
+ pub fn get_mut_vec(&mut self) -> &mut Vec<u8> {
+ match &mut self.inner {
+ BufMutViewInner::ZeroCopy(_) => {
+ panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec");
+ }
+ BufMutViewInner::Vec(vec) => vec,
+ }
+ }
+}
+
+impl Deref for BufMutView {
+ type Target = [u8];
+
+ fn deref(&self) -> &[u8] {
+ let buf = match &self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref(),
+ BufMutViewInner::Vec(vec) => vec.deref(),
+ };
+ &buf[self.cursor..]
+ }
+}
+
+impl DerefMut for BufMutView {
+ fn deref_mut(&mut self) -> &mut [u8] {
+ let buf = match &mut self.inner {
+ BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref_mut(),
+ BufMutViewInner::Vec(vec) => vec.deref_mut(),
+ };
+ &mut buf[self.cursor..]
+ }
+}
+
+impl AsRef<[u8]> for BufMutView {
+ fn as_ref(&self) -> &[u8] {
+ self.deref()
+ }
+}
+
+impl AsMut<[u8]> for BufMutView {
+ fn as_mut(&mut self) -> &mut [u8] {
+ self.deref_mut()
+ }
+}
+
+impl From<ZeroCopyBuf> for BufMutView {
+ fn from(buf: ZeroCopyBuf) -> Self {
+ Self::from_inner(BufMutViewInner::ZeroCopy(buf))
+ }
+}
+
+impl From<Vec<u8>> for BufMutView {
+ fn from(buf: Vec<u8>) -> Self {
+ Self::from_inner(BufMutViewInner::Vec(buf))
+ }
+}
+
+pub enum WriteOutcome {
+ Partial { nwritten: usize, view: BufView },
+ Full { nwritten: usize },
+}
+
+impl WriteOutcome {
+ pub fn nwritten(&self) -> usize {
+ match self {
+ WriteOutcome::Partial { nwritten, .. } => *nwritten,
+ WriteOutcome::Full { nwritten } => *nwritten,
+ }
+ }
+}
diff --git a/core/lib.rs b/core/lib.rs
index 8043b5e95..adda98046 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -8,6 +8,7 @@ mod extensions;
mod flags;
mod gotham_state;
mod inspector;
+mod io;
mod module_specifier;
mod modules;
mod normalize_path;
@@ -58,6 +59,9 @@ pub use crate::inspector::InspectorMsgKind;
pub use crate::inspector::InspectorSessionProxy;
pub use crate::inspector::JsRuntimeInspector;
pub use crate::inspector::LocalInspectorSession;
+pub use crate::io::BufMutView;
+pub use crate::io::BufView;
+pub use crate::io::WriteOutcome;
pub use crate::module_specifier::resolve_import;
pub use crate::module_specifier::resolve_path;
pub use crate::module_specifier::resolve_url;
diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs
index 7393d4b69..41741bf28 100644
--- a/core/ops_builtin.rs
+++ b/core/ops_builtin.rs
@@ -2,6 +2,8 @@
use crate::error::format_file_name;
use crate::error::type_error;
use crate::include_js_files;
+use crate::io::BufMutView;
+use crate::io::BufView;
use crate::ops_metrics::OpMetrics;
use crate::resources::ResourceId;
use crate::Extension;
@@ -166,7 +168,8 @@ async fn op_read(
buf: ZeroCopyBuf,
) -> Result<u32, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
- resource.read_return(buf).await.map(|(n, _)| n as u32)
+ let view = BufMutView::from(buf);
+ resource.read_byob(view).await.map(|(n, _)| n as u32)
}
#[op]
@@ -175,18 +178,67 @@ async fn op_read_all(
rid: ResourceId,
) -> Result<ZeroCopyBuf, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
- let (min, maximum) = resource.size_hint();
- let size = maximum.unwrap_or(min) as usize;
- let mut buffer = Vec::with_capacity(size);
+ // The number of bytes we attempt to grow the buffer by each time it fills
+ // up and we have more data to read. We start at 64 KB. The grow_len is
+ // doubled if the nread returned from a single read is equal or greater than
+ // the grow_len. This allows us to reduce allocations for resources that can
+ // read large chunks of data at a time.
+ let mut grow_len: usize = 64 * 1024;
+
+ let (min, maybe_max) = resource.size_hint();
+ // Try to determine an optimial starting buffer size for this resource based
+ // on the size hint.
+ let initial_size = match (min, maybe_max) {
+ (min, Some(max)) if min == max => min as usize,
+ (_min, Some(max)) if (max as usize) < grow_len => max as usize,
+ (min, _) if (min as usize) < grow_len => grow_len,
+ (min, _) => min as usize,
+ };
+
+ let mut buf = BufMutView::new(initial_size);
loop {
- let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]);
- let (nread, tmp) = resource.clone().read_return(tmp).await?;
- if nread == 0 {
- return Ok(buffer.into());
+ // if the buffer does not have much remaining space, we may have to grow it.
+ if buf.len() < grow_len {
+ let vec = buf.get_mut_vec();
+ match maybe_max {
+ Some(max) if vec.len() >= max as usize => {
+ // no need to resize the vec, because the vec is already large enough
+ // to accomodate the maximum size of the read data.
+ }
+ Some(max) if (max as usize) < vec.len() + grow_len => {
+ // grow the vec to the maximum size of the read data
+ vec.resize(max as usize, 0);
+ }
+ _ => {
+ // grow the vec by grow_len
+ vec.resize(vec.len() + grow_len, 0);
+ }
+ }
+ }
+ let (n, new_buf) = resource.clone().read_byob(buf).await?;
+ buf = new_buf;
+ buf.advance_cursor(n);
+ if n == 0 {
+ break;
+ }
+ if n >= grow_len {
+ // we managed to read more or equal data than fits in a single grow_len in
+ // a single go, so let's attempt to read even more next time. this reduces
+ // allocations for resources that can read large chunks of data at a time.
+ grow_len *= 2;
}
- buffer.extend_from_slice(&tmp[..nread]);
}
+
+ let nread = buf.reset_cursor();
+ let mut vec = buf.unwrap_vec();
+ // If the buffer is larger than the amount of data read, shrink it to the
+ // amount of data read.
+ if nread < vec.len() {
+ vec.truncate(nread);
+ }
+
+ Ok(ZeroCopyBuf::from(vec))
}
#[op]
@@ -196,7 +248,9 @@ async fn op_write(
buf: ZeroCopyBuf,
) -> Result<u32, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
- resource.write(buf).await.map(|n| n as u32)
+ let view = BufView::from(buf);
+ let resp = resource.write(view).await?;
+ Ok(resp.nwritten() as u32)
}
#[op]
diff --git a/core/resources.rs b/core/resources.rs
index 1a1ba3193..ee9ee689f 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -8,7 +8,9 @@
use crate::error::bad_resource_id;
use crate::error::not_supported;
-use crate::ZeroCopyBuf;
+use crate::io::BufMutView;
+use crate::io::BufView;
+use crate::io::WriteOutcome;
use anyhow::Error;
use futures::Future;
use std::any::type_name;
@@ -23,9 +25,51 @@ use std::rc::Rc;
/// Returned by resource read/write/shutdown methods
pub type AsyncResult<T> = Pin<Box<dyn Future<Output = Result<T, Error>>>>;
-/// All objects that can be store in the resource table should implement the
-/// `Resource` trait.
-/// TODO(@AaronO): investigate avoiding alloc on read/write/shutdown
+/// Resources are Rust objects that are attached to a [deno_core::JsRuntime].
+/// They are identified in JS by a numeric ID (the resource ID, or rid).
+/// Resources can be created in ops. Resources can also be retrieved in ops by
+/// their rid. Resources are not thread-safe - they can only be accessed from
+/// the thread that the JsRuntime lives on.
+///
+/// Resources are reference counted in Rust. This means that they can be
+/// cloned and passed around. When the last reference is dropped, the resource
+/// is automatically closed. As long as the resource exists in the resource
+/// table, the reference count is at least 1.
+///
+/// ### Readable
+///
+/// Readable resources are resources that can have data read from. Examples of
+/// this are files, sockets, or HTTP streams.
+///
+/// Readables can be read from from either JS or Rust. In JS one can use
+/// `Deno.core.read()` to read from a single chunk of data from a readable. In
+/// Rust one can directly call `read()` or `read_byob()`. The Rust side code is
+/// used to implement ops like `op_slice`.
+///
+/// A distinction can be made between readables that produce chunks of data
+/// themselves (they allocate the chunks), and readables that fill up
+/// bring-your-own-buffers (BYOBs). The former is often the case for framed
+/// protocols like HTTP, while the latter is often the case for kernel backed
+/// resources like files and sockets.
+///
+/// All readables must implement `read()`. If resources can support an optimized
+/// path for BYOBs, they should also implement `read_byob()`. For kernel backed
+/// resources it often makes sense to implement `read_byob()` first, and then
+/// implement `read()` as an operation that allocates a new chunk with
+/// `len == limit`, then calls `read_byob()`, and then returns a chunk sliced to
+/// the number of bytes read. Kernel backed resources can use the
+/// [deno_core::impl_readable_byob] macro to implement optimized `read_byob()`
+/// and `read()` implementations from a single `Self::read()` method.
+///
+/// ### Writable
+///
+/// Writable resources are resources that can have data written to. Examples of
+/// this are files, sockets, or HTTP streams.
+///
+/// Writables can be written to from either JS or Rust. In JS one can use
+/// `Deno.core.write()` to write to a single chunk of data to a writable. In
+/// Rust one can directly call `write()`. The latter is used to implement ops
+/// like `op_slice`.
pub trait Resource: Any + 'static {
/// Returns a string representation of the resource which is made available
/// to JavaScript code through `op_resources`. The default implementation
@@ -35,20 +79,86 @@ pub trait Resource: Any + 'static {
type_name::<Self>().into()
}
- /// Resources may implement `read_return()` to be a readable stream
- fn read_return(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ /// Read a single chunk of data from the resource. This operation returns a
+ /// `BufView` that represents the data that was read. If a zero length buffer
+ /// is returned, it indicates that the resource has reached EOF.
+ ///
+ /// If this method is not implemented, the default implementation will error
+ /// with a "not supported" error.
+ ///
+ /// If a readable can provide an optimized path for BYOBs, it should also
+ /// implement `read_byob()`.
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ _ = limit;
Box::pin(futures::future::err(not_supported()))
}
- /// Resources may implement `write()` to be a writable stream
- fn write(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ /// Read a single chunk of data from the resource into the provided `BufMutView`.
+ ///
+ /// This operation returns the number of bytes read. If zero bytes are read,
+ /// it indicates that the resource has reached EOF.
+ ///
+ /// If this method is not implemented explicitly, the default implementation
+ /// will call `read()` and then copy the data into the provided buffer. For
+ /// readable resources that can provide an optimized path for BYOBs, it is
+ /// strongly recommended to override this method.
+ fn read_byob(
+ self: Rc<Self>,
+ mut buf: BufMutView,
+ ) -> AsyncResult<(usize, BufMutView)> {
+ Box::pin(async move {
+ let read = self.read(buf.len()).await?;
+ let nread = read.len();
+ buf[..nread].copy_from_slice(&read);
+ Ok((nread, buf))
+ })
+ }
+
+ /// Write a single chunk of data to the resource. The operation may not be
+ /// able to write the entire chunk, in which case it should return the number
+ /// of bytes written. Additionally it should return the `BufView` that was
+ /// passed in.
+ ///
+ /// If this method is not implemented, the default implementation will error
+ /// with a "not supported" error.
+ fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
+ _ = buf;
Box::pin(futures::future::err(not_supported()))
}
- /// Resources may implement `shutdown()` for graceful async shutdowns
+ /// Write an entire chunk of data to the resource. Unlike `write()`, this will
+ /// ensure the entire chunk is written. If the operation is not able to write
+ /// the entire chunk, an error is to be returned.
+ ///
+ /// By default this method will call `write()` repeatedly until the entire
+ /// chunk is written. Resources that can write the entire chunk in a single
+ /// operation using an optimized path should override this method.
+ fn write_all(self: Rc<Self>, view: BufView) -> AsyncResult<()> {
+ Box::pin(async move {
+ let mut view = view;
+ let this = self;
+ while !view.is_empty() {
+ let resp = this.clone().write(view).await?;
+ match resp {
+ WriteOutcome::Partial {
+ nwritten,
+ view: new_view,
+ } => {
+ view = new_view;
+ view.advance_cursor(nwritten);
+ }
+ WriteOutcome::Full { .. } => break,
+ }
+ }
+ Ok(())
+ })
+ }
+
+ /// The shutdown method can be used to asynchronously close the resource. It
+ /// is not automatically called when the resource is dropped or closed.
+ ///
+ /// If this method is not implemented, the default implementation will error
+ /// with a "not supported" error.
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(futures::future::err(not_supported()))
}
@@ -229,3 +339,60 @@ impl ResourceTable {
.map(|(&id, resource)| (id, resource.name()))
}
}
+
+#[macro_export]
+macro_rules! impl_readable_byob {
+ () => {
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<$crate::BufView> {
+ Box::pin(async move {
+ let mut vec = vec![0; limit];
+ let nread = self.read(&mut vec).await?;
+ if nread != vec.len() {
+ vec.truncate(nread);
+ }
+ let view = $crate::BufView::from(vec);
+ Ok(view)
+ })
+ }
+
+ fn read_byob(
+ self: Rc<Self>,
+ mut buf: $crate::BufMutView,
+ ) -> AsyncResult<(usize, $crate::BufMutView)> {
+ Box::pin(async move {
+ let nread = self.read(buf.as_mut()).await?;
+ Ok((nread, buf))
+ })
+ }
+ };
+}
+
+#[macro_export]
+macro_rules! impl_writable {
+ (__write) => {
+ fn write(
+ self: Rc<Self>,
+ view: $crate::BufView,
+ ) -> AsyncResult<$crate::WriteOutcome> {
+ Box::pin(async move {
+ let nwritten = self.write(&view).await?;
+ Ok($crate::WriteOutcome::Partial { nwritten, view })
+ })
+ }
+ };
+ (__write_all) => {
+ fn write_all(self: Rc<Self>, view: $crate::BufView) -> AsyncResult<()> {
+ Box::pin(async move {
+ self.write_all(&view).await?;
+ Ok(())
+ })
+ }
+ };
+ () => {
+ $crate::impl_writable!(__write);
+ };
+ (with_all) => {
+ $crate::impl_writable!(__write);
+ $crate::impl_writable!(__write_all);
+ };
+}
diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs
index 7e97fb563..75aa7cc6e 100644
--- a/ext/cache/sqlite.rs
+++ b/ext/cache/sqlite.rs
@@ -7,7 +7,6 @@ use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::ByteString;
use deno_core::Resource;
-use deno_core::ZeroCopyBuf;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
@@ -347,10 +346,10 @@ pub struct CachePutResource {
}
impl CachePutResource {
- async fn write(self: Rc<Self>, data: ZeroCopyBuf) -> Result<usize, AnyError> {
+ async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
- file.write_all(&data).await?;
+ file.write_all(data).await?;
Ok(data.len())
}
@@ -374,9 +373,7 @@ impl Resource for CachePutResource {
"CachePutResource".into()
}
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
+ deno_core::impl_writable!();
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
@@ -394,28 +391,20 @@ impl CacheResponseResource {
}
}
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
- let nread = file.read(&mut buf).await?;
- Ok((nread, buf))
+ let nread = file.read(data).await?;
+ Ok(nread)
}
}
impl Resource for CacheResponseResource {
+ deno_core::impl_readable_byob!();
+
fn name(&self) -> Cow<str> {
"CacheResponseResource".into()
}
-
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
}
pub fn hash(token: &str) -> String {
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 0adc32343..b8f784284 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -5,11 +5,14 @@ mod fs_fetch_handler;
use data_url::DataUrl;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::include_js_files;
use deno_core::op;
+use deno_core::BufView;
+use deno_core::WriteOutcome;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
@@ -43,15 +46,14 @@ use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::cmp::min;
use std::convert::From;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
-use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use tokio_util::io::StreamReader;
// Re-export reqwest and data_url
pub use data_url;
@@ -252,7 +254,7 @@ where
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
- let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
+ let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1);
// If the size of the body is known, we include a content-length
// header explicitly.
@@ -401,12 +403,11 @@ pub async fn op_fetch_send(
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}));
- let stream_reader = StreamReader::new(stream);
let rid = state
.borrow_mut()
.resource_table
.add(FetchResponseBodyResource {
- reader: AsyncRefCell::new(stream_reader),
+ reader: AsyncRefCell::new(stream.peekable()),
cancel: CancelHandle::default(),
size: content_length,
});
@@ -446,7 +447,7 @@ impl Resource for FetchCancelHandle {
}
pub struct FetchRequestBodyResource {
- body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>,
+ body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>,
cancel: CancelHandle,
}
@@ -455,17 +456,16 @@ impl Resource for FetchRequestBodyResource {
"fetchRequestBody".into()
}
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
Box::pin(async move {
- let data = buf.to_vec();
- let len = data.len();
+ let bytes: bytes::Bytes = buf.into();
+ let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| {
+ body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
type_error("request body receiver not connected (request closed)")
})?;
-
- Ok(len)
+ Ok(WriteOutcome::Full { nwritten })
})
}
@@ -478,7 +478,7 @@ type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
struct FetchResponseBodyResource {
- reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
+ reader: AsyncRefCell<Peekable<BytesStream>>,
cancel: CancelHandle,
size: Option<u64>,
}
@@ -488,15 +488,36 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
- fn read_return(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(async move {
- let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
- Ok((read, buf))
+ let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
+
+ let fut = async move {
+ let mut reader = Pin::new(reader);
+ loop {
+ match reader.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(limit, chunk.len());
+ let chunk = chunk.split_to(len);
+ break Ok(chunk.into());
+ }
+ // This unwrap is safe because `peek_mut()` returned `Some`, and thus
+ // currently has a peeked value that can be synchronously returned
+ // from `next()`.
+ //
+ // The future returned from `next()` is always ready, so we can
+ // safely call `await` on it without creating a race condition.
+ Some(_) => match reader.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok(BufView::empty()),
+ }
+ }
+ };
+
+ let cancel_handle = RcRef::map(self, |r| &r.cancel);
+ fut.try_or_cancel(cancel_handle).await
})
}
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index a7bd8b439..f9ce1c744 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -253,20 +253,16 @@ async fn op_flash_write_resource(
.write_all(b"Transfer-Encoding: chunked\r\n\r\n")
.await?;
loop {
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
+ let view = resource.clone().read(64 * 1024).await?; // 64KB
+ if view.is_empty() {
stream.write_all(b"0\r\n\r\n").await?;
break;
}
-
- let response = &buf[..nread];
// TODO(@littledivy): use vectored writes.
stream
- .write_all(format!("{:x}\r\n", response.len()).as_bytes())
+ .write_all(format!("{:x}\r\n", view.len()).as_bytes())
.await?;
- stream.write_all(response).await?;
+ stream.write_all(&view).await?;
stream.write_all(b"\r\n").await?;
}
resource.close();
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index a8c2810bc..e71d9fae3 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -23,6 +23,8 @@ use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op;
use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -333,61 +335,58 @@ impl HttpStreamResource {
}
}
-impl HttpStreamResource {
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
- let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
-
- let body = loop {
- match &mut *rd {
- HttpRequestReader::Headers(_) => {}
- HttpRequestReader::Body(_, body) => break body,
- HttpRequestReader::Closed => return Ok((0, buf)),
- }
- match take(&mut *rd) {
- HttpRequestReader::Headers(request) => {
- let (parts, body) = request.into_parts();
- *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+impl Resource for HttpStreamResource {
+ fn name(&self) -> Cow<str> {
+ "httpStream".into()
+ }
+
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ Box::pin(async move {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+
+ let body = loop {
+ match &mut *rd {
+ HttpRequestReader::Headers(_) => {}
+ HttpRequestReader::Body(_, body) => break body,
+ HttpRequestReader::Closed => return Ok(BufView::empty()),
}
- _ => unreachable!(),
+ match take(&mut *rd) {
+ HttpRequestReader::Headers(request) => {
+ let (parts, body) = request.into_parts();
+ *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+ }
+ _ => unreachable!(),
+ };
};
- };
- let fut = async {
- let mut body = Pin::new(body);
- loop {
- match body.as_mut().peek_mut().await {
- Some(Ok(chunk)) if !chunk.is_empty() => {
- let len = min(buf.len(), chunk.len());
- buf[..len].copy_from_slice(&chunk.split_to(len));
- break Ok((len, buf));
+ let fut = async {
+ let mut body = Pin::new(body);
+ loop {
+ match body.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(limit, chunk.len());
+ let buf = chunk.split_to(len);
+ let view = BufView::from(buf);
+ break Ok(view);
+ }
+ // This unwrap is safe because `peek_mut()` returned `Some`, and thus
+ // currently has a peeked value that can be synchronously returned
+ // from `next()`.
+ //
+ // The future returned from `next()` is always ready, so we can
+ // safely call `await` on it without creating a race condition.
+ Some(_) => match body.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok(BufView::empty()),
}
- Some(_) => match body.as_mut().next().await.unwrap() {
- Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
- },
- None => break Ok((0, buf)),
}
- }
- };
-
- let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
- fut.try_or_cancel(cancel_handle).await
- }
-}
-
-impl Resource for HttpStreamResource {
- fn name(&self) -> Cow<str> {
- "httpStream".into()
- }
+ };
- fn read_return(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(_buf))
+ let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
+ fut.try_or_cancel(cancel_handle).await
+ })
}
fn close(self: Rc<Self>) {
@@ -763,16 +762,14 @@ async fn op_http_write_resource(
_ => {}
};
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
+ let view = resource.clone().read(64 * 1024).await?; // 64KB
+ if view.is_empty() {
break;
}
match &mut *wr {
HttpResponseWriter::Body(body) => {
- if let Err(err) = body.write_all(&buf[..nread]).await {
+ if let Err(err) = body.write_all(&view).await {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
@@ -782,9 +779,8 @@ async fn op_http_write_resource(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
- let mut buf = buf.to_temp();
- buf.truncate(nread);
- if let Err(err) = body.send_data(Bytes::from(buf)).await {
+ let bytes = Bytes::from(view);
+ if let Err(err) = body.send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
diff --git a/ext/net/io.rs b/ext/net/io.rs
index c9587c851..4c9fbe3d2 100644
--- a/ext/net/io.rs
+++ b/ext/net/io.rs
@@ -9,7 +9,6 @@ use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
-use deno_core::ZeroCopyBuf;
use socket2::SockRef;
use std::borrow::Cow;
use std::rc::Rc;
@@ -69,22 +68,16 @@ where
pub async fn read(
self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ data: &mut [u8],
+ ) -> Result<usize, AnyError> {
let mut rd = self.rd_borrow_mut().await;
- let nread = rd
- .read(&mut buf)
- .try_or_cancel(self.cancel_handle())
- .await?;
- Ok((nread, buf))
+ let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
}
- pub async fn write(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let mut wr = self.wr_borrow_mut().await;
- let nwritten = wr.write(&buf).await?;
+ let nwritten = wr.write(data).await?;
Ok(nwritten)
}
@@ -99,21 +92,13 @@ pub type TcpStreamResource =
FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
impl Resource for TcpStreamResource {
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+
fn name(&self) -> Cow<str> {
"tcpStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
-
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}
@@ -161,16 +146,10 @@ pub struct UnixStreamResource;
#[cfg(not(unix))]
impl UnixStreamResource {
- pub async fn read(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ fn read(self: Rc<Self>, data: &mut [u8]) -> AsyncResult<usize> {
unreachable!()
}
- pub async fn write(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ fn write(self: Rc<Self>, data: &[u8]) -> AsyncResult<usize> {
unreachable!()
}
pub async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> {
@@ -182,21 +161,13 @@ impl UnixStreamResource {
}
impl Resource for UnixStreamResource {
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+
fn name(&self) -> Cow<str> {
"unixStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
-
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index 230f4359e..a1b48b84e 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -38,7 +38,6 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
use deno_tls::create_client_config;
use deno_tls::load_certs;
use deno_tls::load_private_keys;
@@ -691,21 +690,18 @@ impl TlsStreamResource {
pub async fn read(
self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ data: &mut [u8],
+ ) -> Result<usize, AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
- let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
- Ok((nread, buf))
+ let nread = rd.read(data).try_or_cancel(cancel_handle).await?;
+ Ok(nread)
}
- pub async fn write(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
self.handshake().await?;
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
- let nwritten = wr.write(&buf).await?;
+ let nwritten = wr.write(data).await?;
wr.flush().await?;
Ok(nwritten)
}
@@ -736,21 +732,13 @@ impl TlsStreamResource {
}
impl Resource for TlsStreamResource {
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+
fn name(&self) -> Cow<str> {
"tlsStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
-
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 18c7fb5e5..8ed6969f9 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -7,6 +7,8 @@ use deno_core::parking_lot::Mutex;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
+use deno_core::BufMutView;
+use deno_core::BufView;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Extension;
@@ -202,9 +204,9 @@ where
RcRef::map(self, |r| &r.stream).borrow_mut()
}
- async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, AnyError> {
+ async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let mut stream = self.borrow_mut().await;
- let nwritten = stream.write(&buf).await?;
+ let nwritten = stream.write(data).await?;
Ok(nwritten)
}
@@ -250,16 +252,10 @@ where
self.cancel_handle.cancel()
}
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> {
let mut rd = self.borrow_mut().await;
- let nread = rd
- .read(&mut buf)
- .try_or_cancel(self.cancel_handle())
- .await?;
- Ok((nread, buf))
+ let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
}
pub fn into_inner(self) -> S {
@@ -274,9 +270,7 @@ impl Resource for ChildStdinResource {
"childStdin".into()
}
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
+ deno_core::impl_writable!();
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
@@ -286,17 +280,12 @@ impl Resource for ChildStdinResource {
pub type ChildStdoutResource = ReadOnlyResource<process::ChildStdout>;
impl Resource for ChildStdoutResource {
+ deno_core::impl_readable_byob!();
+
fn name(&self) -> Cow<str> {
"childStdout".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
fn close(self: Rc<Self>) {
self.cancel_read_ops();
}
@@ -305,17 +294,12 @@ impl Resource for ChildStdoutResource {
pub type ChildStderrResource = ReadOnlyResource<process::ChildStderr>;
impl Resource for ChildStderrResource {
+ deno_core::impl_readable_byob!();
+
fn name(&self) -> Cow<str> {
"childStderr".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
fn close(self: Rc<Self>) {
self.cancel_read_ops();
}
@@ -534,25 +518,34 @@ impl StdFileResource {
result
}
- async fn read(
+ async fn read_byob(
self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ mut buf: BufMutView,
+ ) -> Result<(usize, BufMutView), AnyError> {
self
- .with_inner_blocking_task(
- move |inner| -> Result<(usize, ZeroCopyBuf), AnyError> {
- Ok((inner.read(&mut buf)?, buf))
- },
- )
+ .with_inner_blocking_task(move |inner| {
+ let nread = inner.read(&mut buf)?;
+ Ok((nread, buf))
+ })
.await
}
- async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, AnyError> {
+ async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
+ let buf = data.to_owned();
self
.with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf))
.await
}
+ async fn write_all(self: Rc<Self>, data: &[u8]) -> Result<(), AnyError> {
+ let buf = data.to_owned();
+ self
+ .with_inner_blocking_task(move |inner| {
+ inner.write_all_and_maybe_flush(&buf)
+ })
+ .await
+ }
+
fn with_resource<F, R>(
state: &mut OpState,
rid: ResourceId,
@@ -641,17 +634,28 @@ impl Resource for StdFileResource {
self.name.as_str().into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<deno_core::BufView> {
+ Box::pin(async move {
+ let vec = vec![0; limit];
+ let buf = BufMutView::from(vec);
+ let (nread, buf) = self.read_byob(buf).await?;
+ let mut vec = buf.unwrap_vec();
+ if vec.len() != nread {
+ vec.truncate(nread);
+ }
+ Ok(BufView::from(vec))
+ })
}
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
+ fn read_byob(
+ self: Rc<Self>,
+ buf: deno_core::BufMutView,
+ ) -> AsyncResult<(usize, deno_core::BufMutView)> {
+ Box::pin(self.read_byob(buf))
}
+ deno_core::impl_writable!(with_all);
+
#[cfg(unix)]
fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> {
use std::os::unix::io::AsRawFd;