diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/http/request_body.rs | 31 |
1 files changed, 19 insertions, 12 deletions
diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs index 73908ca55..0c3f29320 100644 --- a/ext/http/request_body.rs +++ b/ext/http/request_body.rs @@ -15,6 +15,8 @@ use hyper1::body::SizeHint; use std::borrow::Cow; use std::pin::Pin; use std::rc::Rc; +use std::task::ready; +use std::task::Poll; /// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8. struct ReadFuture(Incoming); @@ -25,21 +27,26 @@ impl Stream for ReadFuture { fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Self::Item>> { - let res = Pin::new(&mut self.get_mut().0).poll_frame(cx); - match res { - std::task::Poll::Ready(Some(Ok(frame))) => { - if let Ok(data) = frame.into_data() { - // Ensure that we never yield an empty frame - if !data.is_empty() { - return std::task::Poll::Ready(Some(Ok(data))); + ) -> Poll<Option<Self::Item>> { + // Loop until we receive a non-empty frame from Hyper + let this = self.get_mut(); + loop { + let res = ready!(Pin::new(&mut this.0).poll_frame(cx)); + break match res { + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + // Ensure that we never yield an empty frame + if !data.is_empty() { + break Poll::Ready(Some(Ok::<_, AnyError>(data))); + } } + // Loop again so we don't lose the waker + continue; } - } - std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), - _ => {} + Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), + None => Poll::Ready(None), + }; } - std::task::Poll::Pending } } |