summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/fetch/26_fetch.js32
-rw-r--r--ext/fetch/byte_stream.rs87
-rw-r--r--ext/fetch/lib.rs73
-rw-r--r--ext/node/ops/http.rs8
4 files changed, 70 insertions, 130 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index 5084fab34..6be63d077 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -201,6 +201,23 @@ async function mainFetch(req, recursive, terminator) {
let requestSendError;
let requestSendErrorSet = false;
+
+ async function propagateError(err, message) {
+ // TODO(lucacasonato): propagate error into response body stream
+ try {
+ await core.writeTypeError(requestBodyRid, message);
+ } catch (err) {
+ if (!requestSendErrorSet) {
+ requestSendErrorSet = true;
+ requestSendError = err;
+ }
+ }
+ if (!requestSendErrorSet) {
+ requestSendErrorSet = true;
+ requestSendError = err;
+ }
+ }
+
if (requestBodyRid !== null) {
if (
reqBody === null ||
@@ -220,9 +237,7 @@ async function mainFetch(req, recursive, terminator) {
val = res.value;
} catch (err) {
if (terminator.aborted) break;
- // TODO(lucacasonato): propagate error into response body stream
- requestSendError = err;
- requestSendErrorSet = true;
+ await propagateError(err, "failed to read");
break;
}
if (done) break;
@@ -231,9 +246,7 @@ async function mainFetch(req, recursive, terminator) {
"Item in request body ReadableStream is not a Uint8Array",
);
await reader.cancel(error);
- // TODO(lucacasonato): propagate error into response body stream
- requestSendError = error;
- requestSendErrorSet = true;
+ await propagateError(error, error.message);
break;
}
try {
@@ -241,9 +254,7 @@ async function mainFetch(req, recursive, terminator) {
} catch (err) {
if (terminator.aborted) break;
await reader.cancel(err);
- // TODO(lucacasonato): propagate error into response body stream
- requestSendError = err;
- requestSendErrorSet = true;
+ await propagateError(err, "failed to write");
break;
}
}
@@ -252,8 +263,7 @@ async function mainFetch(req, recursive, terminator) {
await core.shutdown(requestBodyRid);
} catch (err) {
if (!terminator.aborted) {
- requestSendError = err;
- requestSendErrorSet = true;
+ await propagateError(err, "failed to flush");
}
}
}
diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs
deleted file mode 100644
index 33cbfe76e..000000000
--- a/ext/fetch/byte_stream.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
-
-use deno_core::futures::Stream;
-use tokio::sync::mpsc;
-
-/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
-/// used to bridge between the fetch task and the HTTP body stream. The stream
-/// has the special property that it errors if the channel is closed before an
-/// explicit EOF is sent (in the form of a [None] value on the sender).
-pub struct MpscByteStream {
- receiver: mpsc::Receiver<Option<bytes::Bytes>>,
- shutdown: bool,
-}
-
-impl MpscByteStream {
- pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
- let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
- let this = Self {
- receiver,
- shutdown: false,
- };
- (this, sender)
- }
-}
-
-impl Stream for MpscByteStream {
- type Item = Result<bytes::Bytes, std::io::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let val = std::task::ready!(self.receiver.poll_recv(cx));
- match val {
- None if self.shutdown => Poll::Ready(None),
- None => Poll::Ready(Some(Err(std::io::Error::new(
- std::io::ErrorKind::UnexpectedEof,
- "channel closed",
- )))),
- Some(None) => {
- self.shutdown = true;
- Poll::Ready(None)
- }
- Some(Some(val)) => Poll::Ready(Some(Ok(val))),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use bytes::Bytes;
- use deno_core::futures::StreamExt;
-
- #[tokio::test]
- async fn success() {
- let (mut stream, sender) = MpscByteStream::new();
-
- sender.send(Some(Bytes::from("hello"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
-
- sender.send(Some(Bytes::from("world"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
-
- sender.send(None).await.unwrap();
- drop(sender);
- assert!(stream.next().await.is_none());
- }
-
- #[tokio::test]
- async fn error() {
- let (mut stream, sender) = MpscByteStream::new();
-
- sender.send(Some(Bytes::from("hello"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
-
- drop(sender);
- assert_eq!(
- stream.next().await.unwrap().unwrap_err().kind(),
- std::io::ErrorKind::UnexpectedEof
- );
- }
-}
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 48f6a0294..b2b71ec56 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-mod byte_stream;
mod fs_fetch_handler;
use std::borrow::Cow;
@@ -13,10 +12,12 @@ use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
+use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
+use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::op;
@@ -69,8 +70,6 @@ pub use reqwest;
pub use fs_fetch_handler::FsFetchHandler;
-pub use crate::byte_stream::MpscByteStream;
-
#[derive(Clone)]
pub struct Options {
pub user_agent: String,
@@ -293,7 +292,7 @@ where
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
- let (stream, tx) = MpscByteStream::new();
+ let (tx, stream) = tokio::sync::mpsc::channel(1);
// If the size of the body is known, we include a content-length
// header explicitly.
@@ -302,11 +301,11 @@ where
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
}
- request = request.body(Body::wrap_stream(stream));
+ request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(tx),
+ body: AsyncRefCell::new(Some(tx)),
cancel: CancelHandle::default(),
});
@@ -604,8 +603,21 @@ impl Resource for FetchCancelHandle {
}
}
+/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
+pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
+
+impl Stream for FetchBodyStream {
+ type Item = Result<bytes::Bytes, Error>;
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.0.poll_recv(cx)
+ }
+}
+
pub struct FetchRequestBodyResource {
- pub body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
+ pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>,
pub cancel: CancelHandle,
}
@@ -619,38 +631,43 @@ impl Resource for FetchRequestBodyResource {
let bytes: bytes::Bytes = buf.into();
let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ let body = (*body).as_ref();
let cancel = RcRef::map(self, |r| &r.cancel);
- body
- .send(Some(bytes))
- .or_cancel(cancel)
- .await?
- .map_err(|_| {
- type_error("request body receiver not connected (request closed)")
- })?;
+ let body = body.ok_or(type_error(
+ "request body receiver not connected (request closed)",
+ ))?;
+ body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
+ type_error("request body receiver not connected (request closed)")
+ })?;
Ok(WriteOutcome::Full { nwritten })
})
}
- fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
- Box::pin(async move {
+ fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> {
+ async move {
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ let body = (*body).as_ref();
let cancel = RcRef::map(self, |r| &r.cancel);
- // There is a case where hyper knows the size of the response body up
- // front (through content-length header on the resp), where it will drop
- // the body once that content length has been reached, regardless of if
- // the stream is complete or not. This is expected behaviour, but it means
- // that if you stream a body with an up front known size (eg a Blob),
- // explicit shutdown can never succeed because the body (and by extension
- // the receiver) will have dropped by the time we try to shutdown. As such
- // we ignore if the receiver is closed, because we know that the request
- // is complete in good health in that case.
- body.send(None).or_cancel(cancel).await?.ok();
+ let body = body.ok_or(type_error(
+ "request body receiver not connected (request closed)",
+ ))?;
+ body.send(Err(error)).or_cancel(cancel).await??;
Ok(())
- })
+ }
+ .boxed_local()
+ }
+
+ fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
+ async move {
+ let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ body.take();
+ Ok(())
+ }
+ .boxed_local()
}
fn close(self: Rc<Self>) {
- self.cancel.cancel()
+ self.cancel.cancel();
}
}
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index cc7dbf522..2a4d31f50 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -10,12 +10,12 @@ use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_fetch::get_or_create_client_from_state;
+use deno_fetch::FetchBodyStream;
use deno_fetch::FetchCancelHandle;
use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
-use deno_fetch::MpscByteStream;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@@ -64,12 +64,12 @@ where
let request_body_rid = if has_body {
// If no body is passed, we return a writer for streaming the body.
- let (stream, tx) = MpscByteStream::new();
+ let (tx, stream) = tokio::sync::mpsc::channel(1);
- request = request.body(Body::wrap_stream(stream));
+ request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(tx),
+ body: AsyncRefCell::new(Some(tx)),
cancel: CancelHandle::default(),
});