From 71af3c375c229e3311e4c82350025d1955cfa123 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Fri, 15 Sep 2023 08:08:21 -0600 Subject: fix(ext/http): ensure aborted bodies throw (#20503) Fixes #20502 -- ensure that Hyper errors make it through to JS. --- ext/http/request_body.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) (limited to 'ext') diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs index 73908ca55..0c3f29320 100644 --- a/ext/http/request_body.rs +++ b/ext/http/request_body.rs @@ -15,6 +15,8 @@ use hyper1::body::SizeHint; use std::borrow::Cow; use std::pin::Pin; use std::rc::Rc; +use std::task::ready; +use std::task::Poll; /// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8. struct ReadFuture(Incoming); @@ -25,21 +27,26 @@ impl Stream for ReadFuture { fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let res = Pin::new(&mut self.get_mut().0).poll_frame(cx); - match res { - std::task::Poll::Ready(Some(Ok(frame))) => { - if let Ok(data) = frame.into_data() { - // Ensure that we never yield an empty frame - if !data.is_empty() { - return std::task::Poll::Ready(Some(Ok(data))); + ) -> Poll> { + // Loop until we receive a non-empty frame from Hyper + let this = self.get_mut(); + loop { + let res = ready!(Pin::new(&mut this.0).poll_frame(cx)); + break match res { + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + // Ensure that we never yield an empty frame + if !data.is_empty() { + break Poll::Ready(Some(Ok::<_, AnyError>(data))); + } } + // Loop again so we don't lose the waker + continue; } - } - std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), - _ => {} + Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), + None => Poll::Ready(None), + }; } - std::task::Poll::Pending } } -- cgit v1.2.3