diff options
Diffstat (limited to 'ext/fetch')
-rw-r--r-- | ext/fetch/lib.rs | 63 |
1 files changed, 42 insertions, 21 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 0adc32343..b8f784284 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -5,11 +5,14 @@ mod fs_fetch_handler; use data_url::DataUrl; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::stream::Peekable; use deno_core::futures::Future; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::include_js_files; use deno_core::op; +use deno_core::BufView; +use deno_core::WriteOutcome; use deno_core::url::Url; use deno_core::AsyncRefCell; @@ -43,15 +46,14 @@ use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::cmp::min; use std::convert::From; use std::path::Path; use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; -use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::StreamReader; // Re-export reqwest and data_url pub use data_url; @@ -252,7 +254,7 @@ where match data { None => { // If no body is passed, we return a writer for streaming the body. - let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); + let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1); // If the size of the body is known, we include a content-length // header explicitly. @@ -401,12 +403,11 @@ pub async fn op_fetch_send( let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) })); - let stream_reader = StreamReader::new(stream); let rid = state .borrow_mut() .resource_table .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream_reader), + reader: AsyncRefCell::new(stream.peekable()), cancel: CancelHandle::default(), size: content_length, }); @@ -446,7 +447,7 @@ impl Resource for FetchCancelHandle { } pub struct FetchRequestBodyResource { - body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>, + body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>, cancel: CancelHandle, } @@ -455,17 +456,16 @@ impl Resource for FetchRequestBodyResource { "fetchRequestBody".into() } - fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> { Box::pin(async move { - let data = buf.to_vec(); - let len = data.len(); + let bytes: bytes::Bytes = buf.into(); + let nwritten = bytes.len(); let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| { + body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { type_error("request body receiver not connected (request closed)") })?; - - Ok(len) + Ok(WriteOutcome::Full { nwritten }) }) } @@ -478,7 +478,7 @@ type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; struct FetchResponseBodyResource { - reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, + reader: AsyncRefCell<Peekable<BytesStream>>, cancel: CancelHandle, size: Option<u64>, } @@ -488,15 +488,36 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } - fn read_return( - self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { Box::pin(async move { - let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(self, |r| &r.cancel); - let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok((read, buf)) + let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; + + let fut = async move { + let mut reader = Pin::new(reader); + loop { + match reader.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(limit, chunk.len()); + let chunk = chunk.split_to(len); + break Ok(chunk.into()); + } + // This unwrap is safe because `peek_mut()` returned `Some`, and thus + // currently has a peeked value that can be synchronously returned + // from `next()`. + // + // The future returned from `next()` is always ready, so we can + // safely call `await` on it without creating a race condition. + Some(_) => match reader.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(BufView::empty()), + } + } + }; + + let cancel_handle = RcRef::map(self, |r| &r.cancel); + fut.try_or_cancel(cancel_handle).await }) } |