diff options
-rw-r--r-- | ext/fetch/26_fetch.js | 32 | ||||
-rw-r--r-- | ext/fetch/byte_stream.rs | 87 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 73 | ||||
-rw-r--r-- | ext/node/ops/http.rs | 8 |
4 files changed, 70 insertions, 130 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 5084fab34..6be63d077 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -201,6 +201,23 @@ async function mainFetch(req, recursive, terminator) { let requestSendError; let requestSendErrorSet = false; + + async function propagateError(err, message) { + // TODO(lucacasonato): propagate error into response body stream + try { + await core.writeTypeError(requestBodyRid, message); + } catch (err) { + if (!requestSendErrorSet) { + requestSendErrorSet = true; + requestSendError = err; + } + } + if (!requestSendErrorSet) { + requestSendErrorSet = true; + requestSendError = err; + } + } + if (requestBodyRid !== null) { if ( reqBody === null || @@ -220,9 +237,7 @@ async function mainFetch(req, recursive, terminator) { val = res.value; } catch (err) { if (terminator.aborted) break; - // TODO(lucacasonato): propagate error into response body stream - requestSendError = err; - requestSendErrorSet = true; + await propagateError(err, "failed to read"); break; } if (done) break; @@ -231,9 +246,7 @@ async function mainFetch(req, recursive, terminator) { "Item in request body ReadableStream is not a Uint8Array", ); await reader.cancel(error); - // TODO(lucacasonato): propagate error into response body stream - requestSendError = error; - requestSendErrorSet = true; + await propagateError(error, error.message); break; } try { @@ -241,9 +254,7 @@ async function mainFetch(req, recursive, terminator) { } catch (err) { if (terminator.aborted) break; await reader.cancel(err); - // TODO(lucacasonato): propagate error into response body stream - requestSendError = err; - requestSendErrorSet = true; + await propagateError(err, "failed to write"); break; } } @@ -252,8 +263,7 @@ async function mainFetch(req, recursive, terminator) { await core.shutdown(requestBodyRid); } catch (err) { if (!terminator.aborted) { - requestSendError = err; - requestSendErrorSet = true; + await propagateError(err, "failed to flush"); } } } diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs deleted file mode 100644 index 33cbfe76e..000000000 --- a/ext/fetch/byte_stream.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use deno_core::futures::Stream; -use tokio::sync::mpsc; - -/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is -/// used to bridge between the fetch task and the HTTP body stream. The stream -/// has the special property that it errors if the channel is closed before an -/// explicit EOF is sent (in the form of a [None] value on the sender). -pub struct MpscByteStream { - receiver: mpsc::Receiver<Option<bytes::Bytes>>, - shutdown: bool, -} - -impl MpscByteStream { - pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) { - let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1); - let this = Self { - receiver, - shutdown: false, - }; - (this, sender) - } -} - -impl Stream for MpscByteStream { - type Item = Result<bytes::Bytes, std::io::Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - let val = std::task::ready!(self.receiver.poll_recv(cx)); - match val { - None if self.shutdown => Poll::Ready(None), - None => Poll::Ready(Some(Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "channel closed", - )))), - Some(None) => { - self.shutdown = true; - Poll::Ready(None) - } - Some(Some(val)) => Poll::Ready(Some(Ok(val))), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use bytes::Bytes; - use deno_core::futures::StreamExt; - - #[tokio::test] - async fn success() { - let (mut stream, sender) = MpscByteStream::new(); - - sender.send(Some(Bytes::from("hello"))).await.unwrap(); - assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); - - sender.send(Some(Bytes::from("world"))).await.unwrap(); - assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); - - sender.send(None).await.unwrap(); - drop(sender); - assert!(stream.next().await.is_none()); - } - - #[tokio::test] - async fn error() { - let (mut stream, sender) = MpscByteStream::new(); - - sender.send(Some(Bytes::from("hello"))).await.unwrap(); - assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); - - drop(sender); - assert_eq!( - stream.next().await.unwrap().unwrap_err().kind(), - std::io::ErrorKind::UnexpectedEof - ); - } -} diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 48f6a0294..b2b71ec56 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,6 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -mod byte_stream; mod fs_fetch_handler; use std::borrow::Cow; @@ -13,10 +12,12 @@ use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use deno_core::anyhow::Error; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::stream::Peekable; use deno_core::futures::Future; +use deno_core::futures::FutureExt; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::op; @@ -69,8 +70,6 @@ pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; -pub use crate::byte_stream::MpscByteStream; - #[derive(Clone)] pub struct Options { pub user_agent: String, @@ -293,7 +292,7 @@ where match data { None => { // If no body is passed, we return a writer for streaming the body. - let (stream, tx) = MpscByteStream::new(); + let (tx, stream) = tokio::sync::mpsc::channel(1); // If the size of the body is known, we include a content-length // header explicitly. @@ -302,11 +301,11 @@ where request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) } - request = request.body(Body::wrap_stream(stream)); + request = request.body(Body::wrap_stream(FetchBodyStream(stream))); let request_body_rid = state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(tx), + body: AsyncRefCell::new(Some(tx)), cancel: CancelHandle::default(), }); @@ -604,8 +603,21 @@ impl Resource for FetchCancelHandle { } } +/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`]. +pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>); + +impl Stream for FetchBodyStream { + type Item = Result<bytes::Bytes, Error>; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Self::Item>> { + self.0.poll_recv(cx) + } +} + pub struct FetchRequestBodyResource { - pub body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>, + pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>, pub cancel: CancelHandle, } @@ -619,38 +631,43 @@ impl Resource for FetchRequestBodyResource { let bytes: bytes::Bytes = buf.into(); let nwritten = bytes.len(); let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let body = (*body).as_ref(); let cancel = RcRef::map(self, |r| &r.cancel); - body - .send(Some(bytes)) - .or_cancel(cancel) - .await? - .map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; + let body = body.ok_or(type_error( + "request body receiver not connected (request closed)", + ))?; + body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { + type_error("request body receiver not connected (request closed)") + })?; Ok(WriteOutcome::Full { nwritten }) }) } - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { - Box::pin(async move { + fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> { + async move { let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let body = (*body).as_ref(); let cancel = RcRef::map(self, |r| &r.cancel); - // There is a case where hyper knows the size of the response body up - // front (through content-length header on the resp), where it will drop - // the body once that content length has been reached, regardless of if - // the stream is complete or not. This is expected behaviour, but it means - // that if you stream a body with an up front known size (eg a Blob), - // explicit shutdown can never succeed because the body (and by extension - // the receiver) will have dropped by the time we try to shutdown. As such - // we ignore if the receiver is closed, because we know that the request - // is complete in good health in that case. - body.send(None).or_cancel(cancel).await?.ok(); + let body = body.ok_or(type_error( + "request body receiver not connected (request closed)", + ))?; + body.send(Err(error)).or_cancel(cancel).await??; Ok(()) - }) + } + .boxed_local() + } + + fn shutdown(self: Rc<Self>) -> AsyncResult<()> { + async move { + let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + body.take(); + Ok(()) + } + .boxed_local() } fn close(self: Rc<Self>) { - self.cancel.cancel() + self.cancel.cancel(); } } diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index cc7dbf522..2a4d31f50 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -10,12 +10,12 @@ use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::OpState; use deno_fetch::get_or_create_client_from_state; +use deno_fetch::FetchBodyStream; use deno_fetch::FetchCancelHandle; use deno_fetch::FetchRequestBodyResource; use deno_fetch::FetchRequestResource; use deno_fetch::FetchReturn; use deno_fetch::HttpClientResource; -use deno_fetch::MpscByteStream; use reqwest::header::HeaderMap; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; @@ -64,12 +64,12 @@ where let request_body_rid = if has_body { // If no body is passed, we return a writer for streaming the body. - let (stream, tx) = MpscByteStream::new(); + let (tx, stream) = tokio::sync::mpsc::channel(1); - request = request.body(Body::wrap_stream(stream)); + request = request.body(Body::wrap_stream(FetchBodyStream(stream))); let request_body_rid = state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(tx), + body: AsyncRefCell::new(Some(tx)), cancel: CancelHandle::default(), }); |