summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2022-12-19 12:49:00 +0100
committerGitHub <noreply@github.com>2022-12-19 12:49:00 +0100
commit43b6390629ad62edbeca3b884ccee53422876a1a (patch)
treee72fb0808b5abb636e29c41c5fa5a7ee2a547435
parent84b70dd2fb780a779930342d21c27e4e368070f1 (diff)
fix(ext/fetch): handle errors in req body stream (#17081)
Right now an error in a request body stream causes an uncatchable global promise rejection. This PR fixes this to instead propagate the error correctly into the promise returned from `fetch`. It additionally fixes errored readable stream bodies being treated as successfully completed bodies by Rust.
-rw-r--r--cli/tests/unit/fetch_test.ts44
-rw-r--r--ext/fetch/26_fetch.js73
-rw-r--r--ext/fetch/byte_stream.rs87
-rw-r--r--ext/fetch/lib.rs38
4 files changed, 212 insertions, 30 deletions
diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts
index fde119bf1..7035fe444 100644
--- a/cli/tests/unit/fetch_test.ts
+++ b/cli/tests/unit/fetch_test.ts
@@ -4,6 +4,7 @@ import {
assertEquals,
assertRejects,
deferred,
+ delay,
fail,
unimplemented,
} from "./test_util.ts";
@@ -1828,3 +1829,46 @@ Deno.test(
assertEquals(req2.headers.get("x-foo"), "bar");
},
);
+
+Deno.test(
+ { permissions: { net: true } },
+ async function fetchRequestBodyErrorCatchable() {
+ const listener = Deno.listen({ hostname: "127.0.0.1", port: 4514 });
+ const server = (async () => {
+ const conn = await listener.accept();
+ listener.close();
+ const buf = new Uint8Array(160);
+ const n = await conn.read(buf);
+ assertEquals(n, 160); // this is the request headers + first body chunk
+ const n2 = await conn.read(buf);
+ assertEquals(n2, 6); // this is the second body chunk
+ const n3 = await conn.read(buf);
+ assertEquals(n3, null); // the connection now abruptly closes because the client has errored
+ conn.close();
+ })();
+
+ const stream = new ReadableStream({
+ async start(controller) {
+ controller.enqueue(new TextEncoder().encode("a"));
+ await delay(1000);
+ controller.enqueue(new TextEncoder().encode("b"));
+ await delay(1000);
+ controller.error(new Error("foo"));
+ },
+ });
+
+ const err = await assertRejects(() =>
+ fetch("http://localhost:4514", {
+ body: stream,
+ method: "POST",
+ })
+ );
+
+ assert(err instanceof TypeError);
+ assert(err.cause);
+ assert(err.cause instanceof Error);
+ assertEquals(err.cause.message, "foo");
+
+ await server;
+ },
+);
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index e522079bf..4a18e73f2 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -200,6 +200,8 @@
}
terminator[abortSignal.add](onAbort);
+ let requestSendError;
+ let requestSendErrorSet = false;
if (requestBodyRid !== null) {
if (
reqBody === null ||
@@ -210,44 +212,69 @@
const reader = reqBody.getReader();
WeakMapPrototypeSet(requestBodyReaders, req, reader);
(async () => {
- while (true) {
- const { value, done } = await PromisePrototypeCatch(
- reader.read(),
- (err) => {
- if (terminator.aborted) return { done: true, value: undefined };
- throw err;
- },
- );
+ let done = false;
+ while (!done) {
+ let val;
+ try {
+ const res = await reader.read();
+ done = res.done;
+ val = res.value;
+ } catch (err) {
+ if (terminator.aborted) break;
+ // TODO(lucacasonato): propagate error into response body stream
+ requestSendError = err;
+ requestSendErrorSet = true;
+ break;
+ }
if (done) break;
- if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
- await reader.cancel("value not a Uint8Array");
+ if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) {
+ const error = new TypeError(
+ "Item in request body ReadableStream is not a Uint8Array",
+ );
+ await reader.cancel(error);
+ // TODO(lucacasonato): propagate error into response body stream
+ requestSendError = error;
+ requestSendErrorSet = true;
break;
}
try {
- await PromisePrototypeCatch(
- core.writeAll(requestBodyRid, value),
- (err) => {
- if (terminator.aborted) return;
- throw err;
- },
- );
- if (terminator.aborted) break;
+ await core.writeAll(requestBodyRid, val);
} catch (err) {
+ if (terminator.aborted) break;
await reader.cancel(err);
+ // TODO(lucacasonato): propagate error into response body stream
+ requestSendError = err;
+ requestSendErrorSet = true;
break;
}
}
+ if (done && !terminator.aborted) {
+ try {
+ await core.shutdown(requestBodyRid);
+ } catch (err) {
+ if (!terminator.aborted) {
+ requestSendError = err;
+ requestSendErrorSet = true;
+ }
+ }
+ }
WeakMapPrototypeDelete(requestBodyReaders, req);
core.tryClose(requestBodyRid);
})();
}
-
let resp;
try {
- resp = await PromisePrototypeCatch(opFetchSend(requestRid), (err) => {
- if (terminator.aborted) return;
- throw err;
- });
+ resp = await opFetchSend(requestRid);
+ } catch (err) {
+ if (terminator.aborted) return;
+ if (requestSendErrorSet) {
+ // if the request body stream errored, we want to propagate that error
+ // instead of the original error from opFetchSend
+ throw new TypeError("Failed to fetch: request body stream errored", {
+ cause: requestSendError,
+ });
+ }
+ throw err;
} finally {
if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid);
diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs
new file mode 100644
index 000000000..66e29e5a0
--- /dev/null
+++ b/ext/fetch/byte_stream.rs
@@ -0,0 +1,87 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+use deno_core::futures::Stream;
+use tokio::sync::mpsc;
+
+/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
+/// used to bridge between the fetch task and the HTTP body stream. The stream
+/// has the special property that it errors if the channel is closed before an
+/// explicit EOF is sent (in the form of a [None] value on the sender).
+pub struct MpscByteStream {
+ receiver: mpsc::Receiver<Option<bytes::Bytes>>,
+ shutdown: bool,
+}
+
+impl MpscByteStream {
+ pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
+ let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
+ let this = Self {
+ receiver,
+ shutdown: false,
+ };
+ (this, sender)
+ }
+}
+
+impl Stream for MpscByteStream {
+ type Item = Result<bytes::Bytes, std::io::Error>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let val = std::task::ready!(self.receiver.poll_recv(cx));
+ match val {
+ None if self.shutdown => Poll::Ready(None),
+ None => Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "channel closed",
+ )))),
+ Some(None) => {
+ self.shutdown = true;
+ Poll::Ready(None)
+ }
+ Some(Some(val)) => Poll::Ready(Some(Ok(val))),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use bytes::Bytes;
+ use deno_core::futures::StreamExt;
+
+ #[tokio::test]
+ async fn success() {
+ let (mut stream, sender) = MpscByteStream::new();
+
+ sender.send(Some(Bytes::from("hello"))).await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ sender.send(Some(Bytes::from("world"))).await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+ sender.send(None).await.unwrap();
+ drop(sender);
+ assert!(stream.next().await.is_none());
+ }
+
+ #[tokio::test]
+ async fn error() {
+ let (mut stream, sender) = MpscByteStream::new();
+
+ sender.send(Some(Bytes::from("hello"))).await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ drop(sender);
+ assert_eq!(
+ stream.next().await.unwrap().unwrap_err().kind(),
+ std::io::ErrorKind::UnexpectedEof
+ );
+ }
+}
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index f79fc33f4..c19336e7d 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -1,5 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+mod byte_stream;
mod fs_fetch_handler;
use data_url::DataUrl;
@@ -55,7 +56,6 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use tokio::sync::mpsc;
-use tokio_stream::wrappers::ReceiverStream;
// Re-export reqwest and data_url
pub use data_url;
@@ -63,6 +63,8 @@ pub use reqwest;
pub use fs_fetch_handler::FsFetchHandler;
+use crate::byte_stream::MpscByteStream;
+
#[derive(Clone)]
pub struct Options {
pub user_agent: String,
@@ -256,7 +258,7 @@ where
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
- let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1);
+ let (stream, tx) = MpscByteStream::new();
// If the size of the body is known, we include a content-length
// header explicitly.
@@ -265,7 +267,7 @@ where
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
}
- request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
+ request = request.body(Body::wrap_stream(stream));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
@@ -459,7 +461,7 @@ impl Resource for FetchCancelHandle {
}
pub struct FetchRequestBodyResource {
- body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>,
+ body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
cancel: CancelHandle,
}
@@ -474,13 +476,35 @@ impl Resource for FetchRequestBodyResource {
let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
- type_error("request body receiver not connected (request closed)")
- })?;
+ body
+ .send(Some(bytes))
+ .or_cancel(cancel)
+ .await?
+ .map_err(|_| {
+ type_error("request body receiver not connected (request closed)")
+ })?;
Ok(WriteOutcome::Full { nwritten })
})
}
+ fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
+ Box::pin(async move {
+ let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ // There is a case where hyper knows the size of the response body up
+ // front (through content-length header on the resp), where it will drop
+ // the body once that content length has been reached, regardless of if
+ // the stream is complete or not. This is expected behaviour, but it means
+ // that if you stream a body with an up front known size (eg a Blob),
+ // explicit shutdown can never succeed because the body (and by extension
+ // the receiver) will have dropped by the time we try to shutdown. As such
+ // we ignore if the receiver is closed, because we know that the request
+ // is complete in good health in that case.
+ body.send(None).or_cancel(cancel).await?.ok();
+ Ok(())
+ })
+ }
+
fn close(self: Rc<Self>) {
self.cancel.cancel()
}