summaryrefslogtreecommitdiff
path: root/ext/http/service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r--ext/http/service.rs146
1 files changed, 97 insertions, 49 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs
index fbd533cac..c232962be 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -17,6 +17,7 @@ use hyper1::upgrade::OnUpgrade;
use scopeguard::guard;
use scopeguard::ScopeGuard;
+use std::cell::Cell;
use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
@@ -31,12 +32,34 @@ use std::task::Waker;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<HttpRecordResponse>;
+#[cfg(feature = "__http_tracing")]
+pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
+ std::sync::atomic::AtomicUsize::new(0);
+
+macro_rules! http_general_trace {
+ ($($args:expr),*) => {
+ #[cfg(feature = "__http_tracing")]
+ {
+ let count = $crate::service::RECORD_COUNT
+ .load(std::sync::atomic::Ordering::SeqCst);
+
+ println!(
+ "HTTP [+{count}]: {}",
+ format!($($args),*),
+ );
+ }
+ };
+}
+
macro_rules! http_trace {
($record:expr $(, $args:expr)*) => {
#[cfg(feature = "__http_tracing")]
{
+ let count = $crate::service::RECORD_COUNT
+ .load(std::sync::atomic::Ordering::SeqCst);
+
println!(
- "HTTP id={:p} strong={}: {}",
+ "HTTP [+{count}] id={:p} strong={}: {}",
$record,
std::rc::Rc::strong_count(&$record),
format!($($args),*),
@@ -45,44 +68,83 @@ macro_rules! http_trace {
};
}
+pub(crate) use http_general_trace;
pub(crate) use http_trace;
-struct HttpServerStateInner {
+pub(crate) struct HttpServerStateInner {
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
- drain_waker: Option<Waker>,
}
-pub struct HttpServerState(RefCell<HttpServerStateInner>);
+/// A signalling version of `Rc` that allows one to poll for when all other references
+/// to the `Rc` have been dropped.
+#[repr(transparent)]
+pub(crate) struct SignallingRc<T>(Rc<(T, Cell<Option<Waker>>)>);
-impl HttpServerState {
- pub fn new() -> Rc<Self> {
- Rc::new(Self(RefCell::new(HttpServerStateInner {
- pool: Vec::new(),
- drain_waker: None,
- })))
+impl<T> SignallingRc<T> {
+ #[inline]
+ pub fn new(t: T) -> Self {
+ Self(Rc::new((t, Default::default())))
}
- pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a {
- struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>);
+ #[inline]
+ pub fn strong_count(&self) -> usize {
+ Rc::strong_count(&self.0)
+ }
- impl<'a> Future for HttpServerStateDrain<'a> {
- type Output = ();
+ /// Resolves when this is the only remaining reference.
+ #[inline]
+ pub fn poll_complete(&self, cx: &mut Context<'_>) -> Poll<()> {
+ if Rc::strong_count(&self.0) == 1 {
+ Poll::Ready(())
+ } else {
+ self.0 .1.set(Some(cx.waker().clone()));
+ Poll::Pending
+ }
+ }
+}
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Self::Output> {
- let server_state = self.0;
- http_trace!(server_state, "HttpServerState::drain poll");
- if Rc::strong_count(server_state) <= 1 {
- return Poll::Ready(());
- }
- server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone());
- Poll::Pending
+impl<T> Clone for SignallingRc<T> {
+ #[inline]
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
+impl<T> Drop for SignallingRc<T> {
+ #[inline]
+ fn drop(&mut self) {
+ // Trigger the waker iff the refcount is about to become 1.
+ if Rc::strong_count(&self.0) == 2 {
+ if let Some(waker) = self.0 .1.take() {
+ waker.wake();
}
}
+ }
+}
+
+impl<T> std::ops::Deref for SignallingRc<T> {
+ type Target = T;
+ #[inline]
+ fn deref(&self) -> &Self::Target {
+ &self.0 .0
+ }
+}
+
+pub(crate) struct HttpServerState(RefCell<HttpServerStateInner>);
+
+impl HttpServerState {
+ pub fn new() -> SignallingRc<Self> {
+ SignallingRc::new(Self(RefCell::new(HttpServerStateInner {
+ pool: Vec::new(),
+ })))
+ }
+}
+
+impl std::ops::Deref for HttpServerState {
+ type Target = RefCell<HttpServerStateInner>;
- HttpServerStateDrain(self)
+ fn deref(&self) -> &Self::Target {
+ &self.0
}
}
@@ -117,7 +179,7 @@ impl Drop for HttpRequestBodyAutocloser {
pub async fn handle_request(
request: Request,
request_info: HttpConnectionProperties,
- server_state: Rc<HttpServerState>, // Keep server alive for duration of this future.
+ server_state: SignallingRc<HttpServerState>, // Keep server alive for duration of this future.
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> Result<Response, hyper::Error> {
// If the underlying TCP connection is closed, this future will be dropped
@@ -145,7 +207,7 @@ pub async fn handle_request(
}
struct HttpRecordInner {
- server_state: Rc<HttpServerState>,
+ server_state: SignallingRc<HttpServerState>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
@@ -164,17 +226,13 @@ struct HttpRecordInner {
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
#[cfg(feature = "__http_tracing")]
-pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
- std::sync::atomic::AtomicUsize::new(0);
-
-#[cfg(feature = "__http_tracing")]
impl Drop for HttpRecord {
fn drop(&mut self) {
- let count = RECORD_COUNT
+ RECORD_COUNT
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
.checked_sub(1)
.expect("Count went below zero");
- println!("HTTP count={count}: HttpRecord::drop");
+ http_general_trace!("HttpRecord::drop");
}
}
@@ -182,13 +240,13 @@ impl HttpRecord {
fn new(
request: Request,
request_info: HttpConnectionProperties,
- server_state: Rc<HttpServerState>,
+ server_state: SignallingRc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
let request_body = Some(request_body.into());
let (mut response_parts, _) = http::Response::new(()).into_parts();
let record =
- if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
+ if let Some((record, headers)) = server_state.borrow_mut().pool.pop() {
response_parts.headers = headers;
http_trace!(record, "HttpRecord::reuse");
record
@@ -262,23 +320,13 @@ impl HttpRecord {
..
} = self.0.borrow_mut().take().unwrap();
- let mut server_state_mut = server_state.0.borrow_mut();
- let inflight = Rc::strong_count(&server_state);
+ let inflight = server_state.strong_count();
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
- // Server is shutting down so wake the drain future.
- if let Some(waker) = server_state_mut.drain_waker.take() {
- drop(server_state_mut);
- drop(server_state);
- http_trace!(self, "HttpRecord::recycle wake");
- waker.wake();
- return;
- }
-
// Keep a buffer of allocations on hand to be reused by incoming requests.
// Estimated target size is 16 + 1/8 the number of inflight requests.
let target = 16 + (inflight >> 3);
- let pool = &mut server_state_mut.pool;
+ let pool = &mut server_state.borrow_mut().pool;
if target > pool.len() {
headers.clear();
pool.push((self, headers));
@@ -634,7 +682,7 @@ mod tests {
.await
},
)?;
- assert_eq!(Rc::strong_count(&server_state_check), 1);
+ assert_eq!(server_state_check.strong_count(), 1);
Ok(())
}
}