diff options
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r-- | ext/fetch/lib.rs | 38 |
1 files changed, 31 insertions, 7 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index f79fc33f4..c19336e7d 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,5 +1,6 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +mod byte_stream; mod fs_fetch_handler; use data_url::DataUrl; @@ -55,7 +56,6 @@ use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; // Re-export reqwest and data_url pub use data_url; @@ -63,6 +63,8 @@ pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; +use crate::byte_stream::MpscByteStream; + #[derive(Clone)] pub struct Options { pub user_agent: String, @@ -256,7 +258,7 @@ where match data { None => { // If no body is passed, we return a writer for streaming the body. - let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1); + let (stream, tx) = MpscByteStream::new(); // If the size of the body is known, we include a content-length // header explicitly. @@ -265,7 +267,7 @@ where request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) } - request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); + request = request.body(Body::wrap_stream(stream)); let request_body_rid = state.resource_table.add(FetchRequestBodyResource { @@ -459,7 +461,7 @@ impl Resource for FetchCancelHandle { } pub struct FetchRequestBodyResource { - body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>, + body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>, cancel: CancelHandle, } @@ -474,13 +476,35 @@ impl Resource for FetchRequestBodyResource { let nwritten = bytes.len(); let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; + body + .send(Some(bytes)) + .or_cancel(cancel) + .await? + .map_err(|_| { + type_error("request body receiver not connected (request closed)") + })?; Ok(WriteOutcome::Full { nwritten }) }) } + fn shutdown(self: Rc<Self>) -> AsyncResult<()> { + Box::pin(async move { + let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + // There is a case where hyper knows the size of the response body up + // front (through content-length header on the resp), where it will drop + // the body once that content length has been reached, regardless of if + // the stream is complete or not. This is expected behaviour, but it means + // that if you stream a body with an up front known size (eg a Blob), + // explicit shutdown can never succeed because the body (and by extension + // the receiver) will have dropped by the time we try to shutdown. As such + // we ignore if the receiver is closed, because we know that the request + // is complete in good health in that case. + body.send(None).or_cancel(cancel).await?.ok(); + Ok(()) + }) + } + fn close(self: Rc<Self>) { self.cancel.cancel() } |