summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/cache/sqlite.rs27
-rw-r--r--ext/fetch/lib.rs63
-rw-r--r--ext/flash/lib.rs12
-rw-r--r--ext/http/lib.rs110
-rw-r--r--ext/net/io.rs57
-rw-r--r--ext/net/ops_tls.rs30
6 files changed, 130 insertions, 169 deletions
diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs
index 7e97fb563..75aa7cc6e 100644
--- a/ext/cache/sqlite.rs
+++ b/ext/cache/sqlite.rs
@@ -7,7 +7,6 @@ use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::ByteString;
use deno_core::Resource;
-use deno_core::ZeroCopyBuf;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
@@ -347,10 +346,10 @@ pub struct CachePutResource {
}
impl CachePutResource {
- async fn write(self: Rc<Self>, data: ZeroCopyBuf) -> Result<usize, AnyError> {
+ async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
- file.write_all(&data).await?;
+ file.write_all(data).await?;
Ok(data.len())
}
@@ -374,9 +373,7 @@ impl Resource for CachePutResource {
"CachePutResource".into()
}
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
+ deno_core::impl_writable!();
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
@@ -394,28 +391,20 @@ impl CacheResponseResource {
}
}
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
- let nread = file.read(&mut buf).await?;
- Ok((nread, buf))
+ let nread = file.read(data).await?;
+ Ok(nread)
}
}
impl Resource for CacheResponseResource {
+ deno_core::impl_readable_byob!();
+
fn name(&self) -> Cow<str> {
"CacheResponseResource".into()
}
-
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
}
pub fn hash(token: &str) -> String {
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 0adc32343..b8f784284 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -5,11 +5,14 @@ mod fs_fetch_handler;
use data_url::DataUrl;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::include_js_files;
use deno_core::op;
+use deno_core::BufView;
+use deno_core::WriteOutcome;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
@@ -43,15 +46,14 @@ use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::cmp::min;
use std::convert::From;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
-use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use tokio_util::io::StreamReader;
// Re-export reqwest and data_url
pub use data_url;
@@ -252,7 +254,7 @@ where
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
- let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
+ let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1);
// If the size of the body is known, we include a content-length
// header explicitly.
@@ -401,12 +403,11 @@ pub async fn op_fetch_send(
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}));
- let stream_reader = StreamReader::new(stream);
let rid = state
.borrow_mut()
.resource_table
.add(FetchResponseBodyResource {
- reader: AsyncRefCell::new(stream_reader),
+ reader: AsyncRefCell::new(stream.peekable()),
cancel: CancelHandle::default(),
size: content_length,
});
@@ -446,7 +447,7 @@ impl Resource for FetchCancelHandle {
}
pub struct FetchRequestBodyResource {
- body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>,
+ body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>,
cancel: CancelHandle,
}
@@ -455,17 +456,16 @@ impl Resource for FetchRequestBodyResource {
"fetchRequestBody".into()
}
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
Box::pin(async move {
- let data = buf.to_vec();
- let len = data.len();
+ let bytes: bytes::Bytes = buf.into();
+ let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| {
+ body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
type_error("request body receiver not connected (request closed)")
})?;
-
- Ok(len)
+ Ok(WriteOutcome::Full { nwritten })
})
}
@@ -478,7 +478,7 @@ type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
struct FetchResponseBodyResource {
- reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
+ reader: AsyncRefCell<Peekable<BytesStream>>,
cancel: CancelHandle,
size: Option<u64>,
}
@@ -488,15 +488,36 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
- fn read_return(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(async move {
- let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
- Ok((read, buf))
+ let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
+
+ let fut = async move {
+ let mut reader = Pin::new(reader);
+ loop {
+ match reader.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(limit, chunk.len());
+ let chunk = chunk.split_to(len);
+ break Ok(chunk.into());
+ }
+ // This unwrap is safe because `peek_mut()` returned `Some`, and thus
+ // currently has a peeked value that can be synchronously returned
+ // from `next()`.
+ //
+ // The future returned from `next()` is always ready, so we can
+ // safely call `await` on it without creating a race condition.
+ Some(_) => match reader.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok(BufView::empty()),
+ }
+ }
+ };
+
+ let cancel_handle = RcRef::map(self, |r| &r.cancel);
+ fut.try_or_cancel(cancel_handle).await
})
}
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index a7bd8b439..f9ce1c744 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -253,20 +253,16 @@ async fn op_flash_write_resource(
.write_all(b"Transfer-Encoding: chunked\r\n\r\n")
.await?;
loop {
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
+ let view = resource.clone().read(64 * 1024).await?; // 64KB
+ if view.is_empty() {
stream.write_all(b"0\r\n\r\n").await?;
break;
}
-
- let response = &buf[..nread];
// TODO(@littledivy): use vectored writes.
stream
- .write_all(format!("{:x}\r\n", response.len()).as_bytes())
+ .write_all(format!("{:x}\r\n", view.len()).as_bytes())
.await?;
- stream.write_all(response).await?;
+ stream.write_all(&view).await?;
stream.write_all(b"\r\n").await?;
}
resource.close();
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index a8c2810bc..e71d9fae3 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -23,6 +23,8 @@ use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op;
use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -333,61 +335,58 @@ impl HttpStreamResource {
}
}
-impl HttpStreamResource {
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
- let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
-
- let body = loop {
- match &mut *rd {
- HttpRequestReader::Headers(_) => {}
- HttpRequestReader::Body(_, body) => break body,
- HttpRequestReader::Closed => return Ok((0, buf)),
- }
- match take(&mut *rd) {
- HttpRequestReader::Headers(request) => {
- let (parts, body) = request.into_parts();
- *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+impl Resource for HttpStreamResource {
+ fn name(&self) -> Cow<str> {
+ "httpStream".into()
+ }
+
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ Box::pin(async move {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+
+ let body = loop {
+ match &mut *rd {
+ HttpRequestReader::Headers(_) => {}
+ HttpRequestReader::Body(_, body) => break body,
+ HttpRequestReader::Closed => return Ok(BufView::empty()),
}
- _ => unreachable!(),
+ match take(&mut *rd) {
+ HttpRequestReader::Headers(request) => {
+ let (parts, body) = request.into_parts();
+ *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+ }
+ _ => unreachable!(),
+ };
};
- };
- let fut = async {
- let mut body = Pin::new(body);
- loop {
- match body.as_mut().peek_mut().await {
- Some(Ok(chunk)) if !chunk.is_empty() => {
- let len = min(buf.len(), chunk.len());
- buf[..len].copy_from_slice(&chunk.split_to(len));
- break Ok((len, buf));
+ let fut = async {
+ let mut body = Pin::new(body);
+ loop {
+ match body.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(limit, chunk.len());
+ let buf = chunk.split_to(len);
+ let view = BufView::from(buf);
+ break Ok(view);
+ }
+ // This unwrap is safe because `peek_mut()` returned `Some`, and thus
+ // currently has a peeked value that can be synchronously returned
+ // from `next()`.
+ //
+ // The future returned from `next()` is always ready, so we can
+ // safely call `await` on it without creating a race condition.
+ Some(_) => match body.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok(BufView::empty()),
}
- Some(_) => match body.as_mut().next().await.unwrap() {
- Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
- },
- None => break Ok((0, buf)),
}
- }
- };
-
- let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
- fut.try_or_cancel(cancel_handle).await
- }
-}
-
-impl Resource for HttpStreamResource {
- fn name(&self) -> Cow<str> {
- "httpStream".into()
- }
+ };
- fn read_return(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(_buf))
+ let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
+ fut.try_or_cancel(cancel_handle).await
+ })
}
fn close(self: Rc<Self>) {
@@ -763,16 +762,14 @@ async fn op_http_write_resource(
_ => {}
};
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
+ let view = resource.clone().read(64 * 1024).await?; // 64KB
+ if view.is_empty() {
break;
}
match &mut *wr {
HttpResponseWriter::Body(body) => {
- if let Err(err) = body.write_all(&buf[..nread]).await {
+ if let Err(err) = body.write_all(&view).await {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
@@ -782,9 +779,8 @@ async fn op_http_write_resource(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
- let mut buf = buf.to_temp();
- buf.truncate(nread);
- if let Err(err) = body.send_data(Bytes::from(buf)).await {
+ let bytes = Bytes::from(view);
+ if let Err(err) = body.send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
diff --git a/ext/net/io.rs b/ext/net/io.rs
index c9587c851..4c9fbe3d2 100644
--- a/ext/net/io.rs
+++ b/ext/net/io.rs
@@ -9,7 +9,6 @@ use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
-use deno_core::ZeroCopyBuf;
use socket2::SockRef;
use std::borrow::Cow;
use std::rc::Rc;
@@ -69,22 +68,16 @@ where
pub async fn read(
self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ data: &mut [u8],
+ ) -> Result<usize, AnyError> {
let mut rd = self.rd_borrow_mut().await;
- let nread = rd
- .read(&mut buf)
- .try_or_cancel(self.cancel_handle())
- .await?;
- Ok((nread, buf))
+ let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
}
- pub async fn write(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let mut wr = self.wr_borrow_mut().await;
- let nwritten = wr.write(&buf).await?;
+ let nwritten = wr.write(data).await?;
Ok(nwritten)
}
@@ -99,21 +92,13 @@ pub type TcpStreamResource =
FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
impl Resource for TcpStreamResource {
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+
fn name(&self) -> Cow<str> {
"tcpStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
-
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}
@@ -161,16 +146,10 @@ pub struct UnixStreamResource;
#[cfg(not(unix))]
impl UnixStreamResource {
- pub async fn read(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ fn read(self: Rc<Self>, data: &mut [u8]) -> AsyncResult<usize> {
unreachable!()
}
- pub async fn write(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ fn write(self: Rc<Self>, data: &[u8]) -> AsyncResult<usize> {
unreachable!()
}
pub async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> {
@@ -182,21 +161,13 @@ impl UnixStreamResource {
}
impl Resource for UnixStreamResource {
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+
fn name(&self) -> Cow<str> {
"unixStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
-
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index 230f4359e..a1b48b84e 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -38,7 +38,6 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
use deno_tls::create_client_config;
use deno_tls::load_certs;
use deno_tls::load_private_keys;
@@ -691,21 +690,18 @@ impl TlsStreamResource {
pub async fn read(
self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ data: &mut [u8],
+ ) -> Result<usize, AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
- let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
- Ok((nread, buf))
+ let nread = rd.read(data).try_or_cancel(cancel_handle).await?;
+ Ok(nread)
}
- pub async fn write(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
self.handshake().await?;
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
- let nwritten = wr.write(&buf).await?;
+ let nwritten = wr.write(data).await?;
wr.flush().await?;
Ok(nwritten)
}
@@ -736,21 +732,13 @@ impl TlsStreamResource {
}
impl Resource for TlsStreamResource {
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+
fn name(&self) -> Cow<str> {
"tlsStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(buf))
- }
-
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(self.write(buf))
- }
-
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}