summaryrefslogtreecommitdiff
path: root/ext/fetch/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r--ext/fetch/lib.rs38
1 files changed, 31 insertions, 7 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index f79fc33f4..c19336e7d 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -1,5 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+mod byte_stream;
mod fs_fetch_handler;
use data_url::DataUrl;
@@ -55,7 +56,6 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use tokio::sync::mpsc;
-use tokio_stream::wrappers::ReceiverStream;
// Re-export reqwest and data_url
pub use data_url;
@@ -63,6 +63,8 @@ pub use reqwest;
pub use fs_fetch_handler::FsFetchHandler;
+use crate::byte_stream::MpscByteStream;
+
#[derive(Clone)]
pub struct Options {
pub user_agent: String,
@@ -256,7 +258,7 @@ where
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
- let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1);
+ let (stream, tx) = MpscByteStream::new();
// If the size of the body is known, we include a content-length
// header explicitly.
@@ -265,7 +267,7 @@ where
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
}
- request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
+ request = request.body(Body::wrap_stream(stream));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
@@ -459,7 +461,7 @@ impl Resource for FetchCancelHandle {
}
pub struct FetchRequestBodyResource {
- body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>,
+ body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
cancel: CancelHandle,
}
@@ -474,13 +476,35 @@ impl Resource for FetchRequestBodyResource {
let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
- type_error("request body receiver not connected (request closed)")
- })?;
+ body
+ .send(Some(bytes))
+ .or_cancel(cancel)
+ .await?
+ .map_err(|_| {
+ type_error("request body receiver not connected (request closed)")
+ })?;
Ok(WriteOutcome::Full { nwritten })
})
}
+ fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
+ Box::pin(async move {
+ let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ // There is a case where hyper knows the size of the response body up
+ // front (through content-length header on the resp), where it will drop
+ // the body once that content length has been reached, regardless of if
+ // the stream is complete or not. This is expected behaviour, but it means
+ // that if you stream a body with an up front known size (eg a Blob),
+ // explicit shutdown can never succeed because the body (and by extension
+ // the receiver) will have dropped by the time we try to shutdown. As such
+ // we ignore if the receiver is closed, because we know that the request
+ // is complete in good health in that case.
+ body.send(None).or_cancel(cancel).await?.ok();
+ Ok(())
+ })
+ }
+
fn close(self: Rc<Self>) {
self.cancel.cancel()
}