diff options
author | Laurence Rowe <l@lrowe.co.uk> | 2023-11-13 11:17:31 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-13 19:17:31 +0000 |
commit | e5819777c3962079a06c3b09bced1945b1c135f7 (patch) | |
tree | c925baa676de01f8ca026cb61473e1891a6d8254 /ext/http/response_body.rs | |
parent | 0209f7b46954d1b7bf923b4191e5a356ec09622c (diff) |
refactor(ext/http): Use HttpRecord as response body to track until body completion (#20822)
Use HttpRecord as response body so requests can be tracked all the way
to response body completion.
This allows Request properties to be accessed while the response body is
streaming.
Graceful shutdown now awaits a future instead of async spinning waiting
for requests to finish.
On the minimal benchmark this refactor improves performance an
additional 2% over pooling alone for a net 3% increase over the previous
deno main branch.
Builds upon https://github.com/denoland/deno/pull/20809 and
https://github.com/denoland/deno/pull/20770.
---------
Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r-- | ext/http/response_body.rs | 171 |
1 files changed, 2 insertions, 169 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 7d91dce6b..7201855cc 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -1,10 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::cell::RefCell; -use std::future::Future; use std::io::Write; use std::pin::Pin; use std::rc::Rc; -use std::task::Waker; use brotli::enc::encode::BrotliEncoderParameter; use brotli::ffi::compressor::BrotliEncoderState; @@ -18,16 +15,13 @@ use deno_core::BufView; use deno_core::Resource; use flate2::write::GzEncoder; use http::HeaderMap; -use hyper1::body::Body; use hyper1::body::Frame; use hyper1::body::SizeHint; use pin_project::pin_project; -use crate::service::HttpRequestBodyAutocloser; - /// Simplification for nested types we use for our streams. We provide a way to convert from /// this type into Hyper's body [`Frame`]. -enum ResponseStreamResult { +pub enum ResponseStreamResult { /// Stream is over. EndOfStream, /// Stream provided non-empty data. @@ -57,53 +51,7 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> { } } -#[derive(Clone, Debug, Default)] -pub struct CompletionHandle { - inner: Rc<RefCell<CompletionHandleInner>>, -} - -#[derive(Debug, Default)] -struct CompletionHandleInner { - complete: bool, - success: bool, - waker: Option<Waker>, -} - -impl CompletionHandle { - pub fn complete(&self, success: bool) { - let mut mut_self = self.inner.borrow_mut(); - mut_self.complete = true; - mut_self.success = success; - if let Some(waker) = mut_self.waker.take() { - drop(mut_self); - waker.wake(); - } - } - - #[allow(dead_code)] - pub fn is_completed(&self) -> bool { - self.inner.borrow().complete - } -} - -impl Future for CompletionHandle { - type Output = bool; - - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Self::Output> { - let mut mut_self = self.inner.borrow_mut(); - if mut_self.complete { - return std::task::Poll::Ready(mut_self.success); - } - - mut_self.waker = Some(cx.waker().clone()); - std::task::Poll::Pending - } -} - -trait PollFrame: Unpin { +pub trait PollFrame: Unpin { fn poll_frame( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -166,52 +114,6 @@ impl std::fmt::Debug for ResponseBytesInner { } } -/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface -/// required by hyper. As the API requires information about request completion (including a success/fail -/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on. -#[derive(Default)] -pub struct ResponseBytes { - inner: ResponseBytesInner, - completion_handle: CompletionHandle, - headers: Rc<RefCell<Option<HeaderMap>>>, - res: Option<HttpRequestBodyAutocloser>, -} - -impl ResponseBytes { - pub fn initialize( - &mut self, - inner: ResponseBytesInner, - req_body_resource: Option<HttpRequestBodyAutocloser>, - ) { - debug_assert!(matches!(self.inner, ResponseBytesInner::Empty)); - self.inner = inner; - self.res = req_body_resource; - } - - pub fn completion_handle(&self) -> CompletionHandle { - self.completion_handle.clone() - } - - pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> { - self.headers.clone() - } - - fn complete(&mut self, success: bool) -> ResponseBytesInner { - if matches!(self.inner, ResponseBytesInner::Done) { - return ResponseBytesInner::Done; - } - - let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done); - self.completion_handle.complete(success); - if success { - current - } else { - current.abort(); - ResponseBytesInner::Done - } - } -} - impl ResponseBytesInner { pub fn abort(self) { match self { @@ -298,75 +200,6 @@ impl ResponseBytesInner { } } -impl Body for ResponseBytes { - type Data = BufView; - type Error = AnyError; - - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { - let res = loop { - let res = match &mut self.inner { - ResponseBytesInner::Done | ResponseBytesInner::Empty => { - if let Some(trailers) = self.headers.borrow_mut().take() { - return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); - } - unreachable!() - } - ResponseBytesInner::Bytes(..) => { - let ResponseBytesInner::Bytes(data) = self.complete(true) else { - unreachable!(); - }; - return std::task::Poll::Ready(Some(Ok(Frame::data(data)))); - } - ResponseBytesInner::UncompressedStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - ResponseBytesInner::GZipStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - ResponseBytesInner::BrotliStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - }; - // This is where we retry the NoData response - if matches!(res, ResponseStreamResult::NoData) { - continue; - } - break res; - }; - - if matches!(res, ResponseStreamResult::EndOfStream) { - if let Some(trailers) = self.headers.borrow_mut().take() { - return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); - } - self.complete(true); - } - std::task::Poll::Ready(res.into()) - } - - fn is_end_stream(&self) -> bool { - matches!( - self.inner, - ResponseBytesInner::Done | ResponseBytesInner::Empty - ) && self.headers.borrow_mut().is_none() - } - - fn size_hint(&self) -> SizeHint { - // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through - // anyways just in case hyper needs it. - self.inner.size_hint() - } -} - -impl Drop for ResponseBytes { - fn drop(&mut self) { - // We won't actually poll_frame for Empty responses so this is where we return success - self.complete(matches!(self.inner, ResponseBytesInner::Empty)); - } -} - pub struct ResourceBodyAdapter { auto_close: bool, stm: Rc<dyn Resource>, |