From 43b6390629ad62edbeca3b884ccee53422876a1a Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 19 Dec 2022 12:49:00 +0100 Subject: fix(ext/fetch): handle errors in req body stream (#17081) Right now an error in a request body stream causes an uncatchable global promise rejection. This PR fixes this to instead propagate the error correctly into the promise returned from `fetch`. It additionally fixes errored readable stream bodies being treated as successfully completed bodies by Rust. --- ext/fetch/byte_stream.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 ext/fetch/byte_stream.rs (limited to 'ext/fetch/byte_stream.rs') diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs new file mode 100644 index 000000000..66e29e5a0 --- /dev/null +++ b/ext/fetch/byte_stream.rs @@ -0,0 +1,87 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use deno_core::futures::Stream; +use tokio::sync::mpsc; + +/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is +/// used to bridge between the fetch task and the HTTP body stream. The stream +/// has the special property that it errors if the channel is closed before an +/// explicit EOF is sent (in the form of a [None] value on the sender). +pub struct MpscByteStream { + receiver: mpsc::Receiver>, + shutdown: bool, +} + +impl MpscByteStream { + pub fn new() -> (Self, mpsc::Sender>) { + let (sender, receiver) = mpsc::channel::>(1); + let this = Self { + receiver, + shutdown: false, + }; + (this, sender) + } +} + +impl Stream for MpscByteStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let val = std::task::ready!(self.receiver.poll_recv(cx)); + match val { + None if self.shutdown => Poll::Ready(None), + None => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "channel closed", + )))), + Some(None) => { + self.shutdown = true; + Poll::Ready(None) + } + Some(Some(val)) => Poll::Ready(Some(Ok(val))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use deno_core::futures::StreamExt; + + #[tokio::test] + async fn success() { + let (mut stream, sender) = MpscByteStream::new(); + + sender.send(Some(Bytes::from("hello"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + sender.send(Some(Bytes::from("world"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + sender.send(None).await.unwrap(); + drop(sender); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn error() { + let (mut stream, sender) = MpscByteStream::new(); + + sender.send(Some(Bytes::from("hello"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + drop(sender); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } +} -- cgit v1.2.3