summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-04-20 22:09:13 +0530
committerGitHub <noreply@github.com>2022-04-20 18:39:13 +0200
commit2612b6f20fc21fb92402aa9086d13a7192ae3814 (patch)
tree59db7a916a2c8ad55000b152912fd4019f75121d
parent57a8fc37fc99491fa2559694f78af52a597bc501 (diff)
core: introduce `resource.read_return` (#14331)
-rw-r--r--.github/workflows/ci.yml6
-rw-r--r--core/examples/http_bench_json_ops.rs16
-rw-r--r--core/resources.rs12
-rw-r--r--ext/fetch/lib.rs7
-rw-r--r--ext/http/lib.rs9
-rw-r--r--ext/net/io.rs16
-rw-r--r--ext/net/ops_tls.rs9
-rw-r--r--runtime/ops/io.rs32
-rw-r--r--serde_v8/magic/buffer.rs8
9 files changed, 80 insertions, 35 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 26bf1ace8..1394be10a 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -236,7 +236,7 @@ jobs:
~/.cargo/registry/index
~/.cargo/registry/cache
~/.cargo/git/db
- key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
+ key: 8-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
# In main branch, always creates fresh cache
- name: Cache build output (main)
@@ -252,7 +252,7 @@ jobs:
!./target/*/*.zip
!./target/*/*.tar.gz
key: |
- 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
+ 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
# Restore cache from the latest 'main' branch build.
- name: Cache build output (PR)
@@ -268,7 +268,7 @@ jobs:
!./target/*/*.tar.gz
key: never_saved
restore-keys: |
- 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
+ 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
# Don't save cache after building PRs or branches other than 'main'.
- name: Skip save cache (PR)
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index 2068c3b85..7c895f326 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -83,13 +83,18 @@ struct TcpStream {
}
impl TcpStream {
- async fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> Result<usize, Error> {
+ async fn read(
+ self: Rc<Self>,
+ mut buf: ZeroCopyBuf,
+ ) -> Result<(usize, ZeroCopyBuf), Error> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- rd.read(&mut buf)
+ let nread = rd
+ .read(&mut buf)
.try_or_cancel(cancel)
.await
- .map_err(Error::from)
+ .map_err(Error::from)?;
+ Ok((nread, buf))
}
async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> {
@@ -99,7 +104,10 @@ impl TcpStream {
}
impl Resource for TcpStream {
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
diff --git a/core/resources.rs b/core/resources.rs
index 9a1447392..ae4ef7394 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -36,7 +36,17 @@ pub trait Resource: Any + 'static {
}
/// Resources may implement `read()` to be a readable stream
- fn read(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ Box::pin(async move {
+ let (nread, _) = self.read_return(buf).await?;
+ Ok(nread)
+ })
+ }
+
+ fn read_return(
+ self: Rc<Self>,
+ _buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(futures::future::err(not_supported()))
}
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index c216d53fa..def823d8f 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -485,12 +485,15 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
- fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ mut buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(async move {
let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
- Ok(read)
+ Ok((read, buf))
})
}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index dff5c14cb..b85dcc473 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -688,16 +688,13 @@ async fn op_http_write_resource(
}
};
- let mut vec = vec![0u8; 64 * 1024];
- let vec_ptr = vec.as_mut_ptr();
+ let vec = vec![0u8; 64 * 1024]; // 64KB
let buf = ZeroCopyBuf::new_temp(vec);
- let nread = resource.clone().read(buf).await?;
+ let (nread, buf) = resource.clone().read_return(buf).await?;
if nread == 0 {
break;
}
- // SAFETY: ZeroCopyBuf keeps the Vec<u8> alive.
- let bytes =
- Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) });
+ let bytes = Bytes::from(buf.to_temp());
match body_tx.send_data(bytes).await {
Ok(_) => {}
Err(err) => {
diff --git a/ext/net/io.rs b/ext/net/io.rs
index 17b86af17..02caf7473 100644
--- a/ext/net/io.rs
+++ b/ext/net/io.rs
@@ -70,13 +70,13 @@ where
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.rd_borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
- Ok(nread)
+ Ok((nread, buf))
}
pub async fn write(
@@ -103,7 +103,10 @@ impl Resource for TcpStreamResource {
"tcpStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@@ -160,7 +163,7 @@ impl UnixStreamResource {
pub async fn read(
self: Rc<Self>,
_buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
unreachable!()
}
pub async fn write(
@@ -182,7 +185,10 @@ impl Resource for UnixStreamResource {
"unixStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index d6b83e6e8..ca922203c 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -674,11 +674,11 @@ impl TlsStreamResource {
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
- Ok(nread)
+ Ok((nread, buf))
}
pub async fn write(
@@ -722,7 +722,10 @@ impl Resource for TlsStreamResource {
"tlsStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 6db5d69a9..b8449af86 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -170,13 +170,13 @@ where
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
- Ok(nread)
+ Ok((nread, buf))
}
}
@@ -203,7 +203,10 @@ impl Resource for ChildStdoutResource {
"childStdout".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@@ -219,7 +222,10 @@ impl Resource for ChildStderrResource {
"childStderr".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@@ -263,16 +269,17 @@ impl StdFileResource {
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
if self.fs_file.is_some() {
let fs_file = self.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
- tokio::task::spawn_blocking(move || {
- let mut std_file = std_file.lock().unwrap();
- std_file.read(&mut buf)
- })
+ tokio::task::spawn_blocking(
+ move || -> Result<(usize, ZeroCopyBuf), AnyError> {
+ let mut std_file = std_file.lock().unwrap();
+ Ok((std_file.read(&mut buf)?, buf))
+ },
+ )
.await?
- .map_err(AnyError::from)
} else {
Err(resource_unavailable())
}
@@ -322,7 +329,10 @@ impl Resource for StdFileResource {
self.name.as_str().into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs
index a0a1c974b..3a8c9499b 100644
--- a/serde_v8/magic/buffer.rs
+++ b/serde_v8/magic/buffer.rs
@@ -29,6 +29,14 @@ impl MagicBuffer {
pub fn new_temp(vec: Vec<u8>) -> Self {
MagicBuffer::Temp(vec)
}
+
+ // TODO(@littledivy): Temporary, this needs a refactor.
+ pub fn to_temp(self) -> Vec<u8> {
+ match self {
+ MagicBuffer::Temp(vec) => vec,
+ _ => unreachable!(),
+ }
+ }
}
impl Clone for MagicBuffer {