diff options
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r-- | ext/fetch/lib.rs | 73 |
1 files changed, 45 insertions, 28 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 48f6a0294..b2b71ec56 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,6 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -mod byte_stream; mod fs_fetch_handler; use std::borrow::Cow; @@ -13,10 +12,12 @@ use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use deno_core::anyhow::Error; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::stream::Peekable; use deno_core::futures::Future; +use deno_core::futures::FutureExt; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::op; @@ -69,8 +70,6 @@ pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; -pub use crate::byte_stream::MpscByteStream; - #[derive(Clone)] pub struct Options { pub user_agent: String, @@ -293,7 +292,7 @@ where match data { None => { // If no body is passed, we return a writer for streaming the body. - let (stream, tx) = MpscByteStream::new(); + let (tx, stream) = tokio::sync::mpsc::channel(1); // If the size of the body is known, we include a content-length // header explicitly. @@ -302,11 +301,11 @@ where request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) } - request = request.body(Body::wrap_stream(stream)); + request = request.body(Body::wrap_stream(FetchBodyStream(stream))); let request_body_rid = state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(tx), + body: AsyncRefCell::new(Some(tx)), cancel: CancelHandle::default(), }); @@ -604,8 +603,21 @@ impl Resource for FetchCancelHandle { } } +/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`]. +pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>); + +impl Stream for FetchBodyStream { + type Item = Result<bytes::Bytes, Error>; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Self::Item>> { + self.0.poll_recv(cx) + } +} + pub struct FetchRequestBodyResource { - pub body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>, + pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>, pub cancel: CancelHandle, } @@ -619,38 +631,43 @@ impl Resource for FetchRequestBodyResource { let bytes: bytes::Bytes = buf.into(); let nwritten = bytes.len(); let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let body = (*body).as_ref(); let cancel = RcRef::map(self, |r| &r.cancel); - body - .send(Some(bytes)) - .or_cancel(cancel) - .await? - .map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; + let body = body.ok_or(type_error( + "request body receiver not connected (request closed)", + ))?; + body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { + type_error("request body receiver not connected (request closed)") + })?; Ok(WriteOutcome::Full { nwritten }) }) } - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { - Box::pin(async move { + fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> { + async move { let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let body = (*body).as_ref(); let cancel = RcRef::map(self, |r| &r.cancel); - // There is a case where hyper knows the size of the response body up - // front (through content-length header on the resp), where it will drop - // the body once that content length has been reached, regardless of if - // the stream is complete or not. This is expected behaviour, but it means - // that if you stream a body with an up front known size (eg a Blob), - // explicit shutdown can never succeed because the body (and by extension - // the receiver) will have dropped by the time we try to shutdown. As such - // we ignore if the receiver is closed, because we know that the request - // is complete in good health in that case. - body.send(None).or_cancel(cancel).await?.ok(); + let body = body.ok_or(type_error( + "request body receiver not connected (request closed)", + ))?; + body.send(Err(error)).or_cancel(cancel).await??; Ok(()) - }) + } + .boxed_local() + } + + fn shutdown(self: Rc<Self>) -> AsyncResult<()> { + async move { + let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + body.take(); + Ok(()) + } + .boxed_local() } fn close(self: Rc<Self>) { - self.cancel.cancel() + self.cancel.cancel(); } } |