diff options
author | Laurence Rowe <l@lrowe.co.uk> | 2023-11-13 09:32:34 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-13 10:32:34 -0700 |
commit | 25950baed347fa2311ecd59ae569a5d3ab4851f5 (patch) | |
tree | 2a6091e6519b74e62a36438dabcf3444ebb37090 /ext/http/service.rs | |
parent | 6e1f3aa0131f145db631d5dfb94c2b9404172194 (diff) |
perf(ext/http): Object pooling for HttpRecord and HeaderMap (#20809)
Reuse existing existing allocations for HttpRecord and response
HeaderMap where possible.
At request end used allocations are returned to the pool and the pool
and the pool sized to 1/8th the current number of inflight requests.
For http1 hyper will reuse the response HeaderMap for the following
request on the connection.
Builds upon https://github.com/denoland/deno/pull/20770
---------
Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r-- | ext/http/service.rs | 141 |
1 files changed, 106 insertions, 35 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs index ea67980f3..87b308755 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -22,14 +22,14 @@ pub type Request = hyper1::Request<Incoming>; pub type Response = hyper1::Response<ResponseBytes>; macro_rules! http_trace { - ($record:expr, $args:tt) => { + ($record:expr $(, $args:expr)*) => { #[cfg(feature = "__http_tracing")] { println!( "HTTP id={:p} strong={}: {}", $record, std::rc::Rc::strong_count(&$record), - format!($args), + format!($($args),*), ); } }; @@ -37,9 +37,19 @@ macro_rules! http_trace { pub(crate) use http_trace; -#[repr(transparent)] -#[derive(Clone, Default)] -pub struct RefCount(pub Rc<()>); +struct HttpServerStateInner { + pool: Vec<(Rc<HttpRecord>, HeaderMap)>, +} + +pub struct HttpServerState(RefCell<HttpServerStateInner>); + +impl HttpServerState { + pub fn new() -> Rc<Self> { + Rc::new(Self(RefCell::new(HttpServerStateInner { + pool: Vec::new(), + }))) + } +} enum RequestBodyState { Incoming(Incoming), @@ -72,15 +82,17 @@ impl Drop for HttpRequestBodyAutocloser { pub async fn handle_request( request: Request, request_info: HttpConnectionProperties, - _refcount: RefCount, // Keep server alive for duration of this future. + server_state: Rc<HttpServerState>, // Keep server alive for duration of this future. tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, ) -> Result<Response, hyper::Error> { // If the underlying TCP connection is closed, this future will be dropped // and execution could stop at any await point. // The HttpRecord must live until JavaScript is done processing so is wrapped // in an Rc. The guard ensures unneeded resources are freed at cancellation. - let guarded_record = - guard(HttpRecord::new(request, request_info), HttpRecord::cancel); + let guarded_record = guard( + HttpRecord::new(request, request_info, server_state), + HttpRecord::cancel, + ); // Clone HttpRecord and send to JavaScript for processing. // Safe to unwrap as channel receiver is never closed. @@ -93,15 +105,13 @@ pub async fn handle_request( // Defuse the guard. Must not await after the point. let record = ScopeGuard::into_inner(guarded_record); http_trace!(record, "handle_request complete"); - assert!( - Rc::strong_count(&record) == 1, - "HTTP state error: Expected to be last strong reference (handle_request)" - ); let response = record.take_response(); + record.recycle(); Ok(response) } struct HttpRecordInner { + server_state: Rc<HttpServerState>, request_info: HttpConnectionProperties, request_parts: Parts, request_body: Option<RequestBodyState>, @@ -113,7 +123,7 @@ struct HttpRecordInner { been_dropped: bool, } -pub struct HttpRecord(RefCell<HttpRecordInner>); +pub struct HttpRecord(RefCell<Option<HttpRecordInner>>); #[cfg(feature = "__http_tracing")] pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = @@ -131,37 +141,86 @@ impl Drop for HttpRecord { } impl HttpRecord { - fn new(request: Request, request_info: HttpConnectionProperties) -> Rc<Self> { - #[cfg(feature = "__http_tracing")] - { - RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } + fn new( + request: Request, + request_info: HttpConnectionProperties, + server_state: Rc<HttpServerState>, + ) -> Rc<Self> { let (request_parts, request_body) = request.into_parts(); let body = ResponseBytes::default(); let trailers = body.trailers(); let request_body = Some(request_body.into()); + let mut response = Response::new(body); + let reuse_record = + if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() { + *response.headers_mut() = headers; + Some(record) + } else { + None + }; let inner = HttpRecordInner { + server_state, request_info, request_parts, request_body, - response: Some(Response::new(body)), + response: Some(response), response_ready: false, response_waker: None, trailers, been_dropped: false, }; - #[allow(clippy::let_and_return)] - let record = Rc::new(Self(RefCell::new(inner))); - http_trace!(record, "HttpRecord::new"); - record + if let Some(record) = reuse_record { + *record.0.borrow_mut() = Some(inner); + http_trace!(record, "HttpRecord::reuse"); + record + } else { + #[cfg(feature = "__http_tracing")] + { + RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + #[allow(clippy::let_and_return)] + let record = Rc::new(Self(RefCell::new(Some(inner)))); + http_trace!(record, "HttpRecord::new"); + record + } + } + + fn recycle(self: Rc<Self>) { + assert!( + Rc::strong_count(&self) == 1, + "HTTP state error: Expected to be last strong reference" + ); + let HttpRecordInner { + server_state, + request_parts: Parts { mut headers, .. }, + .. + } = self.0.borrow_mut().take().unwrap(); + let mut server_state_mut = server_state.0.borrow_mut(); + let inflight = Rc::strong_count(&server_state); + http_trace!(self, "HttpRecord::recycle inflight={}", inflight); + + // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling + // the to-drop objects off to another thread. + + // Keep a buffer of allocations on hand to be reused by incoming requests. + // Estimated target size is 16 + 1/8 the number of inflight requests. + let target = 16 + (inflight >> 3); + let pool = &mut server_state_mut.pool; + if target > pool.len() { + headers.clear(); + pool.push((self, headers)); + } else if target < pool.len() - 8 { + pool.truncate(target); + } } fn self_ref(&self) -> Ref<'_, HttpRecordInner> { - self.0.borrow() + Ref::map(self.0.borrow(), |option| option.as_ref().unwrap()) } fn self_mut(&self) -> RefMut<'_, HttpRecordInner> { - self.0.borrow_mut() + RefMut::map(self.0.borrow_mut(), |option| option.as_mut().unwrap()) } /// Perform the Hyper upgrade on this record. @@ -210,7 +269,13 @@ impl HttpRecord { /// Cleanup resources not needed after the future is dropped. fn cancel(self: Rc<Self>) { http_trace!(self, "HttpRecord::cancel"); - let mut inner = self.0.borrow_mut(); + let mut inner = self.self_mut(); + if inner.response_ready { + // Future dropped between wake() and async fn resuming. + drop(inner); + self.recycle(); + return; + } inner.been_dropped = true; // The request body might include actual resources. inner.request_body.take(); @@ -221,13 +286,14 @@ impl HttpRecord { http_trace!(self, "HttpRecord::complete"); let mut inner = self.self_mut(); assert!( - !inner.been_dropped || Rc::strong_count(&self) == 1, - "HTTP state error: Expected to be last strong reference (been_dropped)" - ); - assert!( !inner.response_ready, "HTTP state error: Entry has already been completed" ); + if inner.been_dropped { + drop(inner); + self.recycle(); + return; + } inner.response_ready = true; if let Some(waker) = inner.response_waker.take() { drop(inner); @@ -277,7 +343,7 @@ impl HttpRecord { self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output> { - let mut mut_self = self.0 .0.borrow_mut(); + let mut mut_self = self.0.self_mut(); if mut_self.response_ready { return std::task::Poll::Ready(()); } @@ -352,8 +418,8 @@ mod tests { #[tokio::test] async fn test_handle_request() -> Result<(), AnyError> { let (tx, mut rx) = tokio::sync::mpsc::channel(10); - let refcount = RefCount::default(); - let refcount_check = refcount.clone(); + let server_state = HttpServerState::new(); + let server_state_check = server_state.clone(); let request_info = HttpConnectionProperties { peer_address: "".into(), peer_port: None, @@ -361,7 +427,12 @@ mod tests { stream_type: NetworkStreamType::Tcp, }; let svc = service_fn(move |req: hyper1::Request<Incoming>| { - handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request( + req, + request_info.clone(), + server_state.clone(), + tx.clone(), + ) }); let client_req = http::Request::builder().uri("/").body("".to_string())?; @@ -395,7 +466,7 @@ mod tests { .await }, )?; - assert_eq!(Rc::strong_count(&refcount_check.0), 1); + assert_eq!(Rc::strong_count(&server_state_check), 1); Ok(()) } } |