summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-09-11 18:06:38 -0600
committerGitHub <noreply@github.com>2023-09-12 00:06:38 +0000
commit950e0e9cd65bd634d59fe60d5a0cb8651958c7fb (patch)
tree400dd0eaacf7ce1018a07e2e232e6d62f54112ad /ext/http/http_next.rs
parentbfd230fd78ef7867eec75e6b21715f51e72e7a23 (diff)
fix(ext/http): create a graceful shutdown API (#20387)
This PR implements a graceful shutdown API for Deno.serve, allowing all current connections to drain from the server before shutting down, while preventing new connections from being started or new transactions on existing connections from being created. We split the cancellation handle into two parts: a listener handle, and a connection handle. A graceful shutdown cancels the listener only, while allowing the connections to drain. The connection handle aborts all futures. If the listener handle is cancelled, we put the connections into graceful shutdown mode, which disables keep-alive on http/1.1 and uses http/2 mechanisms for http/2 connections. In addition, we now guarantee that all connections are complete or cancelled, and all resources are cleaned up when the server `finished` promise resolves -- we use a Rust-side server refcount for this. Performance impact: does not appear to affect basic serving performance by more than 1% (~126k -> ~125k) --------- Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs248
1 files changed, 189 insertions, 59 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 476a55a80..94f6f1241 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,11 +10,13 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
+use crate::slab::http_trace;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
use crate::slab::slab_insert;
use crate::slab::HttpRequestBodyAutocloser;
+use crate::slab::RefCount;
use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
@@ -70,6 +72,7 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
+use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -690,7 +693,10 @@ pub async fn op_http_track(
.resource_table
.get::<HttpJoinHandle>(server_rid)?;
- match handle.or_cancel(join_handle.cancel_handle()).await {
+ match handle
+ .or_cancel(join_handle.connection_cancel_handle())
+ .await
+ {
Ok(true) => Ok(()),
Ok(false) => {
Err(AnyError::msg("connection closed before message completed"))
@@ -705,14 +711,17 @@ pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> SlabFuture<impl Future<Output = ()>> {
- let index = slab_insert(request, request_info);
+ let index = slab_insert(request, request_info, refcount);
let rx = slab_get(index).promise();
SlabFuture(index, async move {
if tx.send(index).await.is_ok() {
+ http_trace!(index, "SlabFuture await");
// We only need to wait for completion if we aren't closed
rx.await;
+ http_trace!(index, "SlabFuture complete");
}
})
}
@@ -745,45 +754,75 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
.keep_alive(true)
.writev(*USE_WRITEV)
- .serve_connection(TokioIo::new(io), svc);
-
- conn.with_upgrades().map_err(AnyError::from)
+ .serve_connection(TokioIo::new(io), svc)
+ .with_upgrades();
+
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
- conn.map_err(AnyError::from)
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
@@ -792,33 +831,46 @@ fn serve_https(
// based on the prefix bytes
let handshake = io.get_ref().1.alpn_protocol();
if handshake == Some(TLS_ALPN_HTTP_2) {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else if handshake == Some(TLS_ALPN_HTTP_11) {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http2_autodetect(io, svc).await
+ serve_http2_autodetect(io, svc, listen_cancel_handle).await
}
}
- .try_or_cancel(cancel),
+ .try_or_cancel(connection_cancel_handle),
)
}
fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
- spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
+ spawn(
+ serve_http2_autodetect(io, svc, listen_cancel_handle)
+ .try_or_cancel(connection_cancel_handle),
+ )
}
fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>>
where
@@ -831,28 +883,58 @@ where
match network_stream {
NetworkStream::Tcp(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
NetworkStream::Tls(conn) => {
- serve_https(conn, connection_properties, cancel, tx)
+ serve_https(conn, connection_properties, lifetime, tx)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
}
}
-struct HttpJoinHandle(
- AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
- // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
- Rc<CancelHandle>,
- AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
-);
+#[derive(Clone)]
+struct HttpLifetime {
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ refcount: RefCount,
+}
+
+struct HttpJoinHandle {
+ join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
+ refcount: RefCount,
+}
impl HttpJoinHandle {
- fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
- self.1.clone()
+ fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
+ Self {
+ join_handle: AsyncRefCell::new(None),
+ connection_cancel_handle: CancelHandle::new_rc(),
+ listen_cancel_handle: CancelHandle::new_rc(),
+ rx: AsyncRefCell::new(rx),
+ refcount: RefCount::default(),
+ }
+ }
+
+ fn lifetime(self: &Rc<Self>) -> HttpLifetime {
+ HttpLifetime {
+ connection_cancel_handle: self.connection_cancel_handle.clone(),
+ listen_cancel_handle: self.listen_cancel_handle.clone(),
+ refcount: self.refcount.clone(),
+ }
+ }
+
+ fn connection_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.connection_cancel_handle.clone()
+ }
+
+ fn listen_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.listen_cancel_handle.clone()
}
}
@@ -862,14 +944,17 @@ impl Resource for HttpJoinHandle {
}
fn close(self: Rc<Self>) {
- self.1.cancel()
+ // During a close operation, we cancel everything
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
impl Drop for HttpJoinHandle {
fn drop(&mut self) {
// In some cases we may be dropped without closing, so let's cancel everything on the way out
- self.1.cancel();
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
@@ -890,23 +975,21 @@ where
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
- let cancel_clone = resource.cancel_handle();
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
+ let listen_cancel_clone = resource.listen_cancel_handle();
+
+ let lifetime = resource.lifetime();
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
- .try_or_cancel(cancel_clone.clone())
+ .try_or_cancel(listen_cancel_clone.clone())
.await?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
- cancel_clone.clone(),
+ lifetime.clone(),
tx.clone(),
);
}
@@ -915,7 +998,7 @@ where
});
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -943,22 +1026,18 @@ where
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>(
connection,
&listen_properties,
- resource.cancel_handle(),
+ resource.lifetime(),
tx,
);
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -981,7 +1060,7 @@ pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
// If join handle is somehow locked, just abort.
let Some(mut handle) =
- RcRef::map(&join_handle, |this| &this.2).try_borrow_mut()
+ RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
return SlabId::MAX;
};
@@ -1006,9 +1085,9 @@ pub async fn op_http_wait(
.resource_table
.get::<HttpJoinHandle>(rid)?;
- let cancel = join_handle.cancel_handle();
+ let cancel = join_handle.listen_cancel_handle();
let next = async {
- let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
+ let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await;
recv.recv().await
}
.or_cancel(cancel)
@@ -1021,19 +1100,13 @@ pub async fn op_http_wait(
}
// No - we're shutting down
- let res = RcRef::map(join_handle, |this| &this.0)
+ let res = RcRef::map(join_handle, |this| &this.join_handle)
.borrow_mut()
.await
.take()
.unwrap()
.await?;
- // Drop the cancel and join handles
- state
- .borrow_mut()
- .resource_table
- .take::<HttpJoinHandle>(rid)?;
-
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
if let Some(err) = err.source() {
@@ -1049,6 +1122,63 @@ pub async fn op_http_wait(
Ok(SlabId::MAX)
}
+/// Cancels the HTTP handle.
+#[op2(fast)]
+pub fn op_http_cancel(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ Ok(())
+}
+
+#[op2(async)]
+pub async fn op_http_close(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state
+ .borrow_mut()
+ .resource_table
+ .take::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ deno_net::check_unstable2(&state, "Deno.Server.shutdown");
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ // Async spin on the refcount while we wait for everything to drain
+ while Rc::strong_count(&join_handle.refcount.0) > 1 {
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+
+ let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
+ .borrow_mut()
+ .await;
+ if let Some(join_handle) = join_handle.take() {
+ join_handle.await??;
+ }
+
+ Ok(())
+}
+
struct UpgradeStream {
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,