summaryrefslogtreecommitdiff
path: root/ext/fetch/byte_stream.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2022-12-19 12:49:00 +0100
committerGitHub <noreply@github.com>2022-12-19 12:49:00 +0100
commit43b6390629ad62edbeca3b884ccee53422876a1a (patch)
treee72fb0808b5abb636e29c41c5fa5a7ee2a547435 /ext/fetch/byte_stream.rs
parent84b70dd2fb780a779930342d21c27e4e368070f1 (diff)
fix(ext/fetch): handle errors in req body stream (#17081)
Right now an error in a request body stream causes an uncatchable global promise rejection. This PR fixes this to instead propagate the error correctly into the promise returned from `fetch`. It additionally fixes errored readable stream bodies being treated as successfully completed bodies by Rust.
Diffstat (limited to 'ext/fetch/byte_stream.rs')
-rw-r--r--ext/fetch/byte_stream.rs87
1 files changed, 87 insertions, 0 deletions
diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs
new file mode 100644
index 000000000..66e29e5a0
--- /dev/null
+++ b/ext/fetch/byte_stream.rs
@@ -0,0 +1,87 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+use deno_core::futures::Stream;
+use tokio::sync::mpsc;
+
+/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
+/// used to bridge between the fetch task and the HTTP body stream. The stream
+/// has the special property that it errors if the channel is closed before an
+/// explicit EOF is sent (in the form of a [None] value on the sender).
+pub struct MpscByteStream {
+ receiver: mpsc::Receiver<Option<bytes::Bytes>>,
+ shutdown: bool,
+}
+
+impl MpscByteStream {
+ pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
+ let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
+ let this = Self {
+ receiver,
+ shutdown: false,
+ };
+ (this, sender)
+ }
+}
+
+impl Stream for MpscByteStream {
+ type Item = Result<bytes::Bytes, std::io::Error>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let val = std::task::ready!(self.receiver.poll_recv(cx));
+ match val {
+ None if self.shutdown => Poll::Ready(None),
+ None => Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "channel closed",
+ )))),
+ Some(None) => {
+ self.shutdown = true;
+ Poll::Ready(None)
+ }
+ Some(Some(val)) => Poll::Ready(Some(Ok(val))),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use bytes::Bytes;
+ use deno_core::futures::StreamExt;
+
+ #[tokio::test]
+ async fn success() {
+ let (mut stream, sender) = MpscByteStream::new();
+
+ sender.send(Some(Bytes::from("hello"))).await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ sender.send(Some(Bytes::from("world"))).await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+ sender.send(None).await.unwrap();
+ drop(sender);
+ assert!(stream.next().await.is_none());
+ }
+
+ #[tokio::test]
+ async fn error() {
+ let (mut stream, sender) = MpscByteStream::new();
+
+ sender.send(Some(Bytes::from("hello"))).await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ drop(sender);
+ assert_eq!(
+ stream.next().await.unwrap().unwrap_err().kind(),
+ std::io::ErrorKind::UnexpectedEof
+ );
+ }
+}