summaryrefslogtreecommitdiff
path: root/ext/http/service.rs
diff options
context:
space:
mode:
authorLaurence Rowe <l@lrowe.co.uk>2023-11-13 09:32:34 -0800
committerGitHub <noreply@github.com>2023-11-13 10:32:34 -0700
commit25950baed347fa2311ecd59ae569a5d3ab4851f5 (patch)
tree2a6091e6519b74e62a36438dabcf3444ebb37090 /ext/http/service.rs
parent6e1f3aa0131f145db631d5dfb94c2b9404172194 (diff)
perf(ext/http): Object pooling for HttpRecord and HeaderMap (#20809)
Reuse existing existing allocations for HttpRecord and response HeaderMap where possible. At request end used allocations are returned to the pool and the pool and the pool sized to 1/8th the current number of inflight requests. For http1 hyper will reuse the response HeaderMap for the following request on the connection. Builds upon https://github.com/denoland/deno/pull/20770 --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r--ext/http/service.rs141
1 files changed, 106 insertions, 35 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs
index ea67980f3..87b308755 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -22,14 +22,14 @@ pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
macro_rules! http_trace {
- ($record:expr, $args:tt) => {
+ ($record:expr $(, $args:expr)*) => {
#[cfg(feature = "__http_tracing")]
{
println!(
"HTTP id={:p} strong={}: {}",
$record,
std::rc::Rc::strong_count(&$record),
- format!($args),
+ format!($($args),*),
);
}
};
@@ -37,9 +37,19 @@ macro_rules! http_trace {
pub(crate) use http_trace;
-#[repr(transparent)]
-#[derive(Clone, Default)]
-pub struct RefCount(pub Rc<()>);
+struct HttpServerStateInner {
+ pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
+}
+
+pub struct HttpServerState(RefCell<HttpServerStateInner>);
+
+impl HttpServerState {
+ pub fn new() -> Rc<Self> {
+ Rc::new(Self(RefCell::new(HttpServerStateInner {
+ pool: Vec::new(),
+ })))
+ }
+}
enum RequestBodyState {
Incoming(Incoming),
@@ -72,15 +82,17 @@ impl Drop for HttpRequestBodyAutocloser {
pub async fn handle_request(
request: Request,
request_info: HttpConnectionProperties,
- _refcount: RefCount, // Keep server alive for duration of this future.
+ server_state: Rc<HttpServerState>, // Keep server alive for duration of this future.
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> Result<Response, hyper::Error> {
// If the underlying TCP connection is closed, this future will be dropped
// and execution could stop at any await point.
// The HttpRecord must live until JavaScript is done processing so is wrapped
// in an Rc. The guard ensures unneeded resources are freed at cancellation.
- let guarded_record =
- guard(HttpRecord::new(request, request_info), HttpRecord::cancel);
+ let guarded_record = guard(
+ HttpRecord::new(request, request_info, server_state),
+ HttpRecord::cancel,
+ );
// Clone HttpRecord and send to JavaScript for processing.
// Safe to unwrap as channel receiver is never closed.
@@ -93,15 +105,13 @@ pub async fn handle_request(
// Defuse the guard. Must not await after the point.
let record = ScopeGuard::into_inner(guarded_record);
http_trace!(record, "handle_request complete");
- assert!(
- Rc::strong_count(&record) == 1,
- "HTTP state error: Expected to be last strong reference (handle_request)"
- );
let response = record.take_response();
+ record.recycle();
Ok(response)
}
struct HttpRecordInner {
+ server_state: Rc<HttpServerState>,
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<RequestBodyState>,
@@ -113,7 +123,7 @@ struct HttpRecordInner {
been_dropped: bool,
}
-pub struct HttpRecord(RefCell<HttpRecordInner>);
+pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
#[cfg(feature = "__http_tracing")]
pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
@@ -131,37 +141,86 @@ impl Drop for HttpRecord {
}
impl HttpRecord {
- fn new(request: Request, request_info: HttpConnectionProperties) -> Rc<Self> {
- #[cfg(feature = "__http_tracing")]
- {
- RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
- }
+ fn new(
+ request: Request,
+ request_info: HttpConnectionProperties,
+ server_state: Rc<HttpServerState>,
+ ) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
let body = ResponseBytes::default();
let trailers = body.trailers();
let request_body = Some(request_body.into());
+ let mut response = Response::new(body);
+ let reuse_record =
+ if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
+ *response.headers_mut() = headers;
+ Some(record)
+ } else {
+ None
+ };
let inner = HttpRecordInner {
+ server_state,
request_info,
request_parts,
request_body,
- response: Some(Response::new(body)),
+ response: Some(response),
response_ready: false,
response_waker: None,
trailers,
been_dropped: false,
};
- #[allow(clippy::let_and_return)]
- let record = Rc::new(Self(RefCell::new(inner)));
- http_trace!(record, "HttpRecord::new");
- record
+ if let Some(record) = reuse_record {
+ *record.0.borrow_mut() = Some(inner);
+ http_trace!(record, "HttpRecord::reuse");
+ record
+ } else {
+ #[cfg(feature = "__http_tracing")]
+ {
+ RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ }
+
+ #[allow(clippy::let_and_return)]
+ let record = Rc::new(Self(RefCell::new(Some(inner))));
+ http_trace!(record, "HttpRecord::new");
+ record
+ }
+ }
+
+ fn recycle(self: Rc<Self>) {
+ assert!(
+ Rc::strong_count(&self) == 1,
+ "HTTP state error: Expected to be last strong reference"
+ );
+ let HttpRecordInner {
+ server_state,
+ request_parts: Parts { mut headers, .. },
+ ..
+ } = self.0.borrow_mut().take().unwrap();
+ let mut server_state_mut = server_state.0.borrow_mut();
+ let inflight = Rc::strong_count(&server_state);
+ http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
+
+ // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling
+ // the to-drop objects off to another thread.
+
+ // Keep a buffer of allocations on hand to be reused by incoming requests.
+ // Estimated target size is 16 + 1/8 the number of inflight requests.
+ let target = 16 + (inflight >> 3);
+ let pool = &mut server_state_mut.pool;
+ if target > pool.len() {
+ headers.clear();
+ pool.push((self, headers));
+ } else if target < pool.len() - 8 {
+ pool.truncate(target);
+ }
}
fn self_ref(&self) -> Ref<'_, HttpRecordInner> {
- self.0.borrow()
+ Ref::map(self.0.borrow(), |option| option.as_ref().unwrap())
}
fn self_mut(&self) -> RefMut<'_, HttpRecordInner> {
- self.0.borrow_mut()
+ RefMut::map(self.0.borrow_mut(), |option| option.as_mut().unwrap())
}
/// Perform the Hyper upgrade on this record.
@@ -210,7 +269,13 @@ impl HttpRecord {
/// Cleanup resources not needed after the future is dropped.
fn cancel(self: Rc<Self>) {
http_trace!(self, "HttpRecord::cancel");
- let mut inner = self.0.borrow_mut();
+ let mut inner = self.self_mut();
+ if inner.response_ready {
+ // Future dropped between wake() and async fn resuming.
+ drop(inner);
+ self.recycle();
+ return;
+ }
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
@@ -221,13 +286,14 @@ impl HttpRecord {
http_trace!(self, "HttpRecord::complete");
let mut inner = self.self_mut();
assert!(
- !inner.been_dropped || Rc::strong_count(&self) == 1,
- "HTTP state error: Expected to be last strong reference (been_dropped)"
- );
- assert!(
!inner.response_ready,
"HTTP state error: Entry has already been completed"
);
+ if inner.been_dropped {
+ drop(inner);
+ self.recycle();
+ return;
+ }
inner.response_ready = true;
if let Some(waker) = inner.response_waker.take() {
drop(inner);
@@ -277,7 +343,7 @@ impl HttpRecord {
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
- let mut mut_self = self.0 .0.borrow_mut();
+ let mut mut_self = self.0.self_mut();
if mut_self.response_ready {
return std::task::Poll::Ready(());
}
@@ -352,8 +418,8 @@ mod tests {
#[tokio::test]
async fn test_handle_request() -> Result<(), AnyError> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
- let refcount = RefCount::default();
- let refcount_check = refcount.clone();
+ let server_state = HttpServerState::new();
+ let server_state_check = server_state.clone();
let request_info = HttpConnectionProperties {
peer_address: "".into(),
peer_port: None,
@@ -361,7 +427,12 @@ mod tests {
stream_type: NetworkStreamType::Tcp,
};
let svc = service_fn(move |req: hyper1::Request<Incoming>| {
- handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
+ handle_request(
+ req,
+ request_info.clone(),
+ server_state.clone(),
+ tx.clone(),
+ )
});
let client_req = http::Request::builder().uri("/").body("".to_string())?;
@@ -395,7 +466,7 @@ mod tests {
.await
},
)?;
- assert_eq!(Rc::strong_count(&refcount_check.0), 1);
+ assert_eq!(Rc::strong_count(&server_state_check), 1);
Ok(())
}
}