summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/http/http_next.rs13
-rw-r--r--ext/http/service.rs146
2 files changed, 107 insertions, 52 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 9504a6fa4..db63601a0 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,15 +10,18 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytesInner;
use crate::service::handle_request;
+use crate::service::http_general_trace;
use crate::service::http_trace;
use crate::service::HttpRecord;
use crate::service::HttpRecordResponse;
use crate::service::HttpRequestBodyAutocloser;
use crate::service::HttpServerState;
+use crate::service::SignallingRc;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::serde_v8::from_v8;
@@ -924,7 +927,7 @@ where
struct HttpLifetime {
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
- server_state: Rc<HttpServerState>,
+ server_state: SignallingRc<HttpServerState>,
}
struct HttpJoinHandle {
@@ -932,7 +935,7 @@ struct HttpJoinHandle {
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
- server_state: Rc<HttpServerState>,
+ server_state: SignallingRc<HttpServerState>,
}
impl HttpJoinHandle {
@@ -1179,6 +1182,7 @@ pub async fn op_http_close(
.take::<HttpJoinHandle>(rid)?;
if graceful {
+ http_general_trace!("graceful shutdown");
// TODO(bartlomieju): replace with `state.feature_checker.check_or_exit`
// once we phase out `check_or_exit_with_legacy_fallback`
state
@@ -1191,8 +1195,9 @@ pub async fn op_http_close(
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
- join_handle.server_state.drain().await;
+ poll_fn(|cx| join_handle.server_state.poll_complete(cx)).await;
} else {
+ http_general_trace!("forceful shutdown");
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
@@ -1200,6 +1205,8 @@ pub async fn op_http_close(
tokio::task::yield_now().await;
}
+ http_general_trace!("awaiting shutdown");
+
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
.borrow_mut()
.await;
diff --git a/ext/http/service.rs b/ext/http/service.rs
index fbd533cac..c232962be 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -17,6 +17,7 @@ use hyper1::upgrade::OnUpgrade;
use scopeguard::guard;
use scopeguard::ScopeGuard;
+use std::cell::Cell;
use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
@@ -31,12 +32,34 @@ use std::task::Waker;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<HttpRecordResponse>;
+#[cfg(feature = "__http_tracing")]
+pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
+ std::sync::atomic::AtomicUsize::new(0);
+
+macro_rules! http_general_trace {
+ ($($args:expr),*) => {
+ #[cfg(feature = "__http_tracing")]
+ {
+ let count = $crate::service::RECORD_COUNT
+ .load(std::sync::atomic::Ordering::SeqCst);
+
+ println!(
+ "HTTP [+{count}]: {}",
+ format!($($args),*),
+ );
+ }
+ };
+}
+
macro_rules! http_trace {
($record:expr $(, $args:expr)*) => {
#[cfg(feature = "__http_tracing")]
{
+ let count = $crate::service::RECORD_COUNT
+ .load(std::sync::atomic::Ordering::SeqCst);
+
println!(
- "HTTP id={:p} strong={}: {}",
+ "HTTP [+{count}] id={:p} strong={}: {}",
$record,
std::rc::Rc::strong_count(&$record),
format!($($args),*),
@@ -45,44 +68,83 @@ macro_rules! http_trace {
};
}
+pub(crate) use http_general_trace;
pub(crate) use http_trace;
-struct HttpServerStateInner {
+pub(crate) struct HttpServerStateInner {
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
- drain_waker: Option<Waker>,
}
-pub struct HttpServerState(RefCell<HttpServerStateInner>);
+/// A signalling version of `Rc` that allows one to poll for when all other references
+/// to the `Rc` have been dropped.
+#[repr(transparent)]
+pub(crate) struct SignallingRc<T>(Rc<(T, Cell<Option<Waker>>)>);
-impl HttpServerState {
- pub fn new() -> Rc<Self> {
- Rc::new(Self(RefCell::new(HttpServerStateInner {
- pool: Vec::new(),
- drain_waker: None,
- })))
+impl<T> SignallingRc<T> {
+ #[inline]
+ pub fn new(t: T) -> Self {
+ Self(Rc::new((t, Default::default())))
}
- pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a {
- struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>);
+ #[inline]
+ pub fn strong_count(&self) -> usize {
+ Rc::strong_count(&self.0)
+ }
- impl<'a> Future for HttpServerStateDrain<'a> {
- type Output = ();
+ /// Resolves when this is the only remaining reference.
+ #[inline]
+ pub fn poll_complete(&self, cx: &mut Context<'_>) -> Poll<()> {
+ if Rc::strong_count(&self.0) == 1 {
+ Poll::Ready(())
+ } else {
+ self.0 .1.set(Some(cx.waker().clone()));
+ Poll::Pending
+ }
+ }
+}
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Self::Output> {
- let server_state = self.0;
- http_trace!(server_state, "HttpServerState::drain poll");
- if Rc::strong_count(server_state) <= 1 {
- return Poll::Ready(());
- }
- server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone());
- Poll::Pending
+impl<T> Clone for SignallingRc<T> {
+ #[inline]
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
+impl<T> Drop for SignallingRc<T> {
+ #[inline]
+ fn drop(&mut self) {
+ // Trigger the waker iff the refcount is about to become 1.
+ if Rc::strong_count(&self.0) == 2 {
+ if let Some(waker) = self.0 .1.take() {
+ waker.wake();
}
}
+ }
+}
+
+impl<T> std::ops::Deref for SignallingRc<T> {
+ type Target = T;
+ #[inline]
+ fn deref(&self) -> &Self::Target {
+ &self.0 .0
+ }
+}
+
+pub(crate) struct HttpServerState(RefCell<HttpServerStateInner>);
+
+impl HttpServerState {
+ pub fn new() -> SignallingRc<Self> {
+ SignallingRc::new(Self(RefCell::new(HttpServerStateInner {
+ pool: Vec::new(),
+ })))
+ }
+}
+
+impl std::ops::Deref for HttpServerState {
+ type Target = RefCell<HttpServerStateInner>;
- HttpServerStateDrain(self)
+ fn deref(&self) -> &Self::Target {
+ &self.0
}
}
@@ -117,7 +179,7 @@ impl Drop for HttpRequestBodyAutocloser {
pub async fn handle_request(
request: Request,
request_info: HttpConnectionProperties,
- server_state: Rc<HttpServerState>, // Keep server alive for duration of this future.
+ server_state: SignallingRc<HttpServerState>, // Keep server alive for duration of this future.
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> Result<Response, hyper::Error> {
// If the underlying TCP connection is closed, this future will be dropped
@@ -145,7 +207,7 @@ pub async fn handle_request(
}
struct HttpRecordInner {
- server_state: Rc<HttpServerState>,
+ server_state: SignallingRc<HttpServerState>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
@@ -164,17 +226,13 @@ struct HttpRecordInner {
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
#[cfg(feature = "__http_tracing")]
-pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
- std::sync::atomic::AtomicUsize::new(0);
-
-#[cfg(feature = "__http_tracing")]
impl Drop for HttpRecord {
fn drop(&mut self) {
- let count = RECORD_COUNT
+ RECORD_COUNT
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
.checked_sub(1)
.expect("Count went below zero");
- println!("HTTP count={count}: HttpRecord::drop");
+ http_general_trace!("HttpRecord::drop");
}
}
@@ -182,13 +240,13 @@ impl HttpRecord {
fn new(
request: Request,
request_info: HttpConnectionProperties,
- server_state: Rc<HttpServerState>,
+ server_state: SignallingRc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
let request_body = Some(request_body.into());
let (mut response_parts, _) = http::Response::new(()).into_parts();
let record =
- if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
+ if let Some((record, headers)) = server_state.borrow_mut().pool.pop() {
response_parts.headers = headers;
http_trace!(record, "HttpRecord::reuse");
record
@@ -262,23 +320,13 @@ impl HttpRecord {
..
} = self.0.borrow_mut().take().unwrap();
- let mut server_state_mut = server_state.0.borrow_mut();
- let inflight = Rc::strong_count(&server_state);
+ let inflight = server_state.strong_count();
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
- // Server is shutting down so wake the drain future.
- if let Some(waker) = server_state_mut.drain_waker.take() {
- drop(server_state_mut);
- drop(server_state);
- http_trace!(self, "HttpRecord::recycle wake");
- waker.wake();
- return;
- }
-
// Keep a buffer of allocations on hand to be reused by incoming requests.
// Estimated target size is 16 + 1/8 the number of inflight requests.
let target = 16 + (inflight >> 3);
- let pool = &mut server_state_mut.pool;
+ let pool = &mut server_state.borrow_mut().pool;
if target > pool.len() {
headers.clear();
pool.push((self, headers));
@@ -634,7 +682,7 @@ mod tests {
.await
},
)?;
- assert_eq!(Rc::strong_count(&server_state_check), 1);
+ assert_eq!(server_state_check.strong_count(), 1);
Ok(())
}
}