diff options
author | Laurence Rowe <l@lrowe.co.uk> | 2023-11-13 11:17:31 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-13 19:17:31 +0000 |
commit | e5819777c3962079a06c3b09bced1945b1c135f7 (patch) | |
tree | c925baa676de01f8ca026cb61473e1891a6d8254 /ext/http/service.rs | |
parent | 0209f7b46954d1b7bf923b4191e5a356ec09622c (diff) |
refactor(ext/http): Use HttpRecord as response body to track until body completion (#20822)
Use HttpRecord as response body so requests can be tracked all the way
to response body completion.
This allows Request properties to be accessed while the response body is
streaming.
Graceful shutdown now awaits a future instead of async spinning waiting
for requests to finish.
On the minimal benchmark this refactor improves performance an
additional 2% over pooling alone for a net 3% increase over the previous
deno main branch.
Builds upon https://github.com/denoland/deno/pull/20809 and
https://github.com/denoland/deno/pull/20770.
---------
Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r-- | ext/http/service.rs | 338 |
1 files changed, 253 insertions, 85 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs index 87b308755..fbd533cac 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -1,13 +1,18 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use crate::request_properties::HttpConnectionProperties; -use crate::response_body::CompletionHandle; -use crate::response_body::ResponseBytes; +use crate::response_body::ResponseBytesInner; +use crate::response_body::ResponseStreamResult; use deno_core::error::AnyError; +use deno_core::futures::ready; +use deno_core::BufView; use deno_core::OpState; use deno_core::ResourceId; use http::request::Parts; use http::HeaderMap; +use hyper1::body::Body; +use hyper1::body::Frame; use hyper1::body::Incoming; +use hyper1::body::SizeHint; use hyper1::upgrade::OnUpgrade; use scopeguard::guard; @@ -16,10 +21,15 @@ use std::cell::Ref; use std::cell::RefCell; use std::cell::RefMut; use std::future::Future; +use std::mem::ManuallyDrop; +use std::pin::Pin; use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; pub type Request = hyper1::Request<Incoming>; -pub type Response = hyper1::Response<ResponseBytes>; +pub type Response = hyper1::Response<HttpRecordResponse>; macro_rules! http_trace { ($record:expr $(, $args:expr)*) => { @@ -39,6 +49,7 @@ pub(crate) use http_trace; struct HttpServerStateInner { pool: Vec<(Rc<HttpRecord>, HeaderMap)>, + drain_waker: Option<Waker>, } pub struct HttpServerState(RefCell<HttpServerStateInner>); @@ -47,8 +58,32 @@ impl HttpServerState { pub fn new() -> Rc<Self> { Rc::new(Self(RefCell::new(HttpServerStateInner { pool: Vec::new(), + drain_waker: None, }))) } + + pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a { + struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>); + + impl<'a> Future for HttpServerStateDrain<'a> { + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + let server_state = self.0; + http_trace!(server_state, "HttpServerState::drain poll"); + if Rc::strong_count(server_state) <= 1 { + return Poll::Ready(()); + } + server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + HttpServerStateDrain(self) + } } enum RequestBodyState { @@ -102,25 +137,28 @@ pub async fn handle_request( http_trace!(*guarded_record, "handle_request response_ready.await"); guarded_record.response_ready().await; - // Defuse the guard. Must not await after the point. + // Defuse the guard. Must not await after this point. let record = ScopeGuard::into_inner(guarded_record); http_trace!(record, "handle_request complete"); - let response = record.take_response(); - record.recycle(); + let response = record.into_response(); Ok(response) } struct HttpRecordInner { server_state: Rc<HttpServerState>, request_info: HttpConnectionProperties, - request_parts: Parts, + request_parts: http::request::Parts, request_body: Option<RequestBodyState>, - /// The response may get taken before we tear this down - response: Option<Response>, + response_parts: Option<http::response::Parts>, response_ready: bool, - response_waker: Option<std::task::Waker>, - trailers: Rc<RefCell<Option<HeaderMap>>>, + response_waker: Option<Waker>, + response_body: ResponseBytesInner, + response_body_finished: bool, + response_body_waker: Option<Waker>, + trailers: Option<HeaderMap>, been_dropped: bool, + finished: bool, + needs_close_after_finish: bool, } pub struct HttpRecord(RefCell<Option<HttpRecordInner>>); @@ -147,45 +185,72 @@ impl HttpRecord { server_state: Rc<HttpServerState>, ) -> Rc<Self> { let (request_parts, request_body) = request.into_parts(); - let body = ResponseBytes::default(); - let trailers = body.trailers(); let request_body = Some(request_body.into()); - let mut response = Response::new(body); - let reuse_record = + let (mut response_parts, _) = http::Response::new(()).into_parts(); + let record = if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() { - *response.headers_mut() = headers; - Some(record) + response_parts.headers = headers; + http_trace!(record, "HttpRecord::reuse"); + record } else { - None + #[cfg(feature = "__http_tracing")] + { + RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + #[allow(clippy::let_and_return)] + let record = Rc::new(Self(RefCell::new(None))); + http_trace!(record, "HttpRecord::new"); + record }; - let inner = HttpRecordInner { + *record.0.borrow_mut() = Some(HttpRecordInner { server_state, request_info, request_parts, request_body, - response: Some(response), + response_parts: Some(response_parts), response_ready: false, response_waker: None, - trailers, + response_body: ResponseBytesInner::Empty, + response_body_finished: false, + response_body_waker: None, + trailers: None, been_dropped: false, - }; - if let Some(record) = reuse_record { - *record.0.borrow_mut() = Some(inner); - http_trace!(record, "HttpRecord::reuse"); - record - } else { - #[cfg(feature = "__http_tracing")] - { - RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } + finished: false, + needs_close_after_finish: false, + }); + record + } + + fn finish(self: Rc<Self>) { + http_trace!(self, "HttpRecord::finish"); + let mut inner = self.self_mut(); + inner.response_body_finished = true; + let response_body_waker = inner.response_body_waker.take(); + let needs_close_after_finish = inner.needs_close_after_finish; + drop(inner); + if let Some(waker) = response_body_waker { + waker.wake(); + } + if !needs_close_after_finish { + self.recycle(); + } + } - #[allow(clippy::let_and_return)] - let record = Rc::new(Self(RefCell::new(Some(inner)))); - http_trace!(record, "HttpRecord::new"); - record + pub fn close_after_finish(self: Rc<Self>) { + debug_assert!(self.self_ref().needs_close_after_finish); + let mut inner = self.self_mut(); + inner.needs_close_after_finish = false; + if !inner.finished { + drop(inner); + self.recycle(); } } + pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> { + RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) + } + fn recycle(self: Rc<Self>) { assert!( Rc::strong_count(&self) == 1, @@ -196,12 +261,19 @@ impl HttpRecord { request_parts: Parts { mut headers, .. }, .. } = self.0.borrow_mut().take().unwrap(); + let mut server_state_mut = server_state.0.borrow_mut(); let inflight = Rc::strong_count(&server_state); http_trace!(self, "HttpRecord::recycle inflight={}", inflight); - // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling - // the to-drop objects off to another thread. + // Server is shutting down so wake the drain future. + if let Some(waker) = server_state_mut.drain_waker.take() { + drop(server_state_mut); + drop(server_state); + http_trace!(self, "HttpRecord::recycle wake"); + waker.wake(); + return; + } // Keep a buffer of allocations on hand to be reused by incoming requests. // Estimated target size is 16 + 1/8 the number of inflight requests. @@ -235,7 +307,7 @@ impl HttpRecord { } /// Take the Hyper body from this record. - pub fn take_body(&self) -> Option<Incoming> { + pub fn take_request_body(&self) -> Option<Incoming> { let body_holder = &mut self.self_mut().request_body; let body = body_holder.take(); match body { @@ -247,18 +319,6 @@ impl HttpRecord { } } - pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Resource(res)) => Some(res), - x => { - *body_holder = x; - None - } - } - } - /// Replace the request body with a resource ID and the OpState we'll need to shut it down. /// We cannot keep just the resource itself, as JS code might be reading from the resource ID /// to generate the response data (requiring us to keep it in the resource table). @@ -273,7 +333,7 @@ impl HttpRecord { if inner.response_ready { // Future dropped between wake() and async fn resuming. drop(inner); - self.recycle(); + self.finish(); return; } inner.been_dropped = true; @@ -291,7 +351,7 @@ impl HttpRecord { ); if inner.been_dropped { drop(inner); - self.recycle(); + self.finish(); return; } inner.response_ready = true; @@ -301,25 +361,44 @@ impl HttpRecord { } } + fn take_response_body(&self) -> ResponseBytesInner { + let mut inner = self.self_mut(); + debug_assert!( + !matches!(inner.response_body, ResponseBytesInner::Done), + "HTTP state error: response body already complete" + ); + std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done) + } + /// Has the future for this record been dropped? ie, has the underlying TCP connection /// been closed? pub fn cancelled(&self) -> bool { self.self_ref().been_dropped } - /// Get a mutable reference to the response. - pub fn response(&self) -> RefMut<'_, Response> { - RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap()) + /// Get a mutable reference to the response status and headers. + pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> { + RefMut::map(self.self_mut(), |inner| { + inner.response_parts.as_mut().unwrap() + }) } /// Get a mutable reference to the trailers. - pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> { - Ref::map(self.self_ref(), |inner| &inner.trailers) + pub fn trailers(&self) -> RefMut<'_, Option<HeaderMap>> { + RefMut::map(self.self_mut(), |inner| &mut inner.trailers) + } + + pub fn set_response_body(&self, response_body: ResponseBytesInner) { + let mut inner = self.self_mut(); + debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty)); + inner.response_body = response_body; } /// Take the response. - fn take_response(&self) -> Response { - self.self_mut().response.take().unwrap() + fn into_response(self: Rc<Self>) -> Response { + let parts = self.self_mut().response_parts.take().unwrap(); + let body = HttpRecordResponse(ManuallyDrop::new(self)); + http::Response::from_parts(parts, body) } /// Get a reference to the connection properties. @@ -332,38 +411,131 @@ impl HttpRecord { Ref::map(self.self_ref(), |inner| &inner.request_parts) } - /// Get a reference to the completion handle. + /// Resolves when response head is ready. fn response_ready(&self) -> impl Future<Output = ()> + '_ { - struct HttpRecordComplete<'a>(&'a HttpRecord); + struct HttpRecordReady<'a>(&'a HttpRecord); - impl<'a> Future for HttpRecordComplete<'a> { + impl<'a> Future for HttpRecordReady<'a> { type Output = (); fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Self::Output> { + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { let mut mut_self = self.0.self_mut(); if mut_self.response_ready { - return std::task::Poll::Ready(()); + return Poll::Ready(()); } mut_self.response_waker = Some(cx.waker().clone()); - std::task::Poll::Pending + Poll::Pending } } - HttpRecordComplete(self) + HttpRecordReady(self) } - /// Get a reference to the response body completion handle. - pub fn body_promise(&self) -> CompletionHandle { - self - .self_ref() - .response - .as_ref() - .unwrap() - .body() - .completion_handle() + /// Resolves when response body has finished streaming. + pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ { + struct HttpRecordFinished<'a>(&'a HttpRecord); + + impl<'a> Future for HttpRecordFinished<'a> { + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + let mut mut_self = self.0.self_mut(); + if mut_self.response_body_finished { + return Poll::Ready(()); + } + mut_self.response_body_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + HttpRecordFinished(self) + } +} + +#[repr(transparent)] +pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>); + +impl Body for HttpRecordResponse { + type Data = BufView; + type Error = AnyError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { + use crate::response_body::PollFrame; + let record = &self.0; + + let res = loop { + let mut inner = record.self_mut(); + let res = match &mut inner.response_body { + ResponseBytesInner::Done | ResponseBytesInner::Empty => { + if let Some(trailers) = inner.trailers.take() { + return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); + } + unreachable!() + } + ResponseBytesInner::Bytes(..) => { + drop(inner); + let ResponseBytesInner::Bytes(data) = record.take_response_body() + else { + unreachable!(); + }; + return Poll::Ready(Some(Ok(Frame::data(data)))); + } + ResponseBytesInner::UncompressedStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + ResponseBytesInner::GZipStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + ResponseBytesInner::BrotliStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + }; + // This is where we retry the NoData response + if matches!(res, ResponseStreamResult::NoData) { + continue; + } + break res; + }; + + if matches!(res, ResponseStreamResult::EndOfStream) { + if let Some(trailers) = record.self_mut().trailers.take() { + return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); + } + record.take_response_body(); + } + Poll::Ready(res.into()) + } + + fn is_end_stream(&self) -> bool { + let inner = self.0.self_ref(); + matches!( + inner.response_body, + ResponseBytesInner::Done | ResponseBytesInner::Empty + ) && inner.trailers.is_none() + } + + fn size_hint(&self) -> SizeHint { + // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through + // anyways just in case hyper needs it. + self.0.self_ref().response_body.size_hint() + } +} + +impl Drop for HttpRecordResponse { + fn drop(&mut self) { + // SAFETY: this ManuallyDrop is not used again. + let record = unsafe { ManuallyDrop::take(&mut self.0) }; + http_trace!(record, "HttpRecordResponse::drop"); + record.finish(); } } @@ -442,14 +614,10 @@ mod tests { async move { // JavaScript handler produces response let record = rx.recv().await.unwrap(); - let resource = record.take_resource(); - record.response().body_mut().initialize( - ResponseBytesInner::from_vec( - Compression::None, - b"hello world".to_vec(), - ), - resource, - ); + record.set_response_body(ResponseBytesInner::from_vec( + Compression::None, + b"hello world".to_vec(), + )); record.complete(); Ok(()) }, |