summaryrefslogtreecommitdiff
path: root/ext/fetch/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r--ext/fetch/lib.rs73
1 files changed, 45 insertions, 28 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 48f6a0294..b2b71ec56 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-mod byte_stream;
mod fs_fetch_handler;
use std::borrow::Cow;
@@ -13,10 +12,12 @@ use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
+use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
+use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::op;
@@ -69,8 +70,6 @@ pub use reqwest;
pub use fs_fetch_handler::FsFetchHandler;
-pub use crate::byte_stream::MpscByteStream;
-
#[derive(Clone)]
pub struct Options {
pub user_agent: String,
@@ -293,7 +292,7 @@ where
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
- let (stream, tx) = MpscByteStream::new();
+ let (tx, stream) = tokio::sync::mpsc::channel(1);
// If the size of the body is known, we include a content-length
// header explicitly.
@@ -302,11 +301,11 @@ where
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
}
- request = request.body(Body::wrap_stream(stream));
+ request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(tx),
+ body: AsyncRefCell::new(Some(tx)),
cancel: CancelHandle::default(),
});
@@ -604,8 +603,21 @@ impl Resource for FetchCancelHandle {
}
}
+/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
+pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
+
+impl Stream for FetchBodyStream {
+ type Item = Result<bytes::Bytes, Error>;
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.0.poll_recv(cx)
+ }
+}
+
pub struct FetchRequestBodyResource {
- pub body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
+ pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>,
pub cancel: CancelHandle,
}
@@ -619,38 +631,43 @@ impl Resource for FetchRequestBodyResource {
let bytes: bytes::Bytes = buf.into();
let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ let body = (*body).as_ref();
let cancel = RcRef::map(self, |r| &r.cancel);
- body
- .send(Some(bytes))
- .or_cancel(cancel)
- .await?
- .map_err(|_| {
- type_error("request body receiver not connected (request closed)")
- })?;
+ let body = body.ok_or(type_error(
+ "request body receiver not connected (request closed)",
+ ))?;
+ body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
+ type_error("request body receiver not connected (request closed)")
+ })?;
Ok(WriteOutcome::Full { nwritten })
})
}
- fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
- Box::pin(async move {
+ fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> {
+ async move {
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ let body = (*body).as_ref();
let cancel = RcRef::map(self, |r| &r.cancel);
- // There is a case where hyper knows the size of the response body up
- // front (through content-length header on the resp), where it will drop
- // the body once that content length has been reached, regardless of if
- // the stream is complete or not. This is expected behaviour, but it means
- // that if you stream a body with an up front known size (eg a Blob),
- // explicit shutdown can never succeed because the body (and by extension
- // the receiver) will have dropped by the time we try to shutdown. As such
- // we ignore if the receiver is closed, because we know that the request
- // is complete in good health in that case.
- body.send(None).or_cancel(cancel).await?.ok();
+ let body = body.ok_or(type_error(
+ "request body receiver not connected (request closed)",
+ ))?;
+ body.send(Err(error)).or_cancel(cancel).await??;
Ok(())
- })
+ }
+ .boxed_local()
+ }
+
+ fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
+ async move {
+ let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ body.take();
+ Ok(())
+ }
+ .boxed_local()
}
fn close(self: Rc<Self>) {
- self.cancel.cancel()
+ self.cancel.cancel();
}
}