diff options
author | Luca Casonato <hello@lcas.dev> | 2022-12-19 12:49:00 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-19 12:49:00 +0100 |
commit | 43b6390629ad62edbeca3b884ccee53422876a1a (patch) | |
tree | e72fb0808b5abb636e29c41c5fa5a7ee2a547435 /ext/fetch/byte_stream.rs | |
parent | 84b70dd2fb780a779930342d21c27e4e368070f1 (diff) |
fix(ext/fetch): handle errors in req body stream (#17081)
Right now an error in a request body stream causes an uncatchable
global promise rejection. This PR fixes this to instead propagate the
error correctly into the promise returned from `fetch`.
It additionally fixes errored readable stream bodies being treated as
successfully completed bodies by Rust.
Diffstat (limited to 'ext/fetch/byte_stream.rs')
-rw-r--r-- | ext/fetch/byte_stream.rs | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs new file mode 100644 index 000000000..66e29e5a0 --- /dev/null +++ b/ext/fetch/byte_stream.rs @@ -0,0 +1,87 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use deno_core::futures::Stream; +use tokio::sync::mpsc; + +/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is +/// used to bridge between the fetch task and the HTTP body stream. The stream +/// has the special property that it errors if the channel is closed before an +/// explicit EOF is sent (in the form of a [None] value on the sender). +pub struct MpscByteStream { + receiver: mpsc::Receiver<Option<bytes::Bytes>>, + shutdown: bool, +} + +impl MpscByteStream { + pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) { + let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1); + let this = Self { + receiver, + shutdown: false, + }; + (this, sender) + } +} + +impl Stream for MpscByteStream { + type Item = Result<bytes::Bytes, std::io::Error>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let val = std::task::ready!(self.receiver.poll_recv(cx)); + match val { + None if self.shutdown => Poll::Ready(None), + None => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "channel closed", + )))), + Some(None) => { + self.shutdown = true; + Poll::Ready(None) + } + Some(Some(val)) => Poll::Ready(Some(Ok(val))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use deno_core::futures::StreamExt; + + #[tokio::test] + async fn success() { + let (mut stream, sender) = MpscByteStream::new(); + + sender.send(Some(Bytes::from("hello"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + sender.send(Some(Bytes::from("world"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + sender.send(None).await.unwrap(); + drop(sender); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn error() { + let (mut stream, sender) = MpscByteStream::new(); + + sender.send(Some(Bytes::from("hello"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + drop(sender); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } +} |