summaryrefslogtreecommitdiff
path: root/ext/fetch/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r--ext/fetch/lib.rs63
1 files changed, 42 insertions, 21 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 0adc32343..b8f784284 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -5,11 +5,14 @@ mod fs_fetch_handler;
use data_url::DataUrl;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::include_js_files;
use deno_core::op;
+use deno_core::BufView;
+use deno_core::WriteOutcome;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
@@ -43,15 +46,14 @@ use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::cmp::min;
use std::convert::From;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
-use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use tokio_util::io::StreamReader;
// Re-export reqwest and data_url
pub use data_url;
@@ -252,7 +254,7 @@ where
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
- let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
+ let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1);
// If the size of the body is known, we include a content-length
// header explicitly.
@@ -401,12 +403,11 @@ pub async fn op_fetch_send(
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}));
- let stream_reader = StreamReader::new(stream);
let rid = state
.borrow_mut()
.resource_table
.add(FetchResponseBodyResource {
- reader: AsyncRefCell::new(stream_reader),
+ reader: AsyncRefCell::new(stream.peekable()),
cancel: CancelHandle::default(),
size: content_length,
});
@@ -446,7 +447,7 @@ impl Resource for FetchCancelHandle {
}
pub struct FetchRequestBodyResource {
- body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>,
+ body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>,
cancel: CancelHandle,
}
@@ -455,17 +456,16 @@ impl Resource for FetchRequestBodyResource {
"fetchRequestBody".into()
}
- fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
Box::pin(async move {
- let data = buf.to_vec();
- let len = data.len();
+ let bytes: bytes::Bytes = buf.into();
+ let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| {
+ body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
type_error("request body receiver not connected (request closed)")
})?;
-
- Ok(len)
+ Ok(WriteOutcome::Full { nwritten })
})
}
@@ -478,7 +478,7 @@ type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
struct FetchResponseBodyResource {
- reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
+ reader: AsyncRefCell<Peekable<BytesStream>>,
cancel: CancelHandle,
size: Option<u64>,
}
@@ -488,15 +488,36 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
- fn read_return(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(async move {
- let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
- Ok((read, buf))
+ let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
+
+ let fut = async move {
+ let mut reader = Pin::new(reader);
+ loop {
+ match reader.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(limit, chunk.len());
+ let chunk = chunk.split_to(len);
+ break Ok(chunk.into());
+ }
+ // This unwrap is safe because `peek_mut()` returned `Some`, and thus
+ // currently has a peeked value that can be synchronously returned
+ // from `next()`.
+ //
+ // The future returned from `next()` is always ready, so we can
+ // safely call `await` on it without creating a race condition.
+ Some(_) => match reader.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok(BufView::empty()),
+ }
+ }
+ };
+
+ let cancel_handle = RcRef::map(self, |r| &r.cancel);
+ fut.try_or_cancel(cancel_handle).await
})
}