diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/http/00_serve.js | 27 | ||||
-rw-r--r-- | ext/http/http_next.rs | 248 | ||||
-rw-r--r-- | ext/http/lib.rs | 2 | ||||
-rw-r--r-- | ext/http/slab.rs | 32 |
4 files changed, 244 insertions, 65 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 914205889..aeebca93d 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -4,7 +4,7 @@ const core = globalThis.Deno.core; const primordials = globalThis.__bootstrap.primordials; const internals = globalThis.__bootstrap.internals; -const { BadResourcePrototype } = core; +const { BadResourcePrototype, InterruptedPrototype } = core; import { InnerBody } from "ext:deno_fetch/22_body.js"; import { Event } from "ext:deno_web/02_event.js"; import { @@ -65,6 +65,8 @@ const { op_http_upgrade_websocket_next, op_http_try_wait, op_http_wait, + op_http_cancel, + op_http_close, } = core.ensureFastOps(); const _upgraded = Symbol("_upgraded"); @@ -334,11 +336,15 @@ class CallbackContext { fallbackHost; serverRid; closed; + closing; constructor(signal, args) { + // The abort signal triggers a non-graceful shutdown signal?.addEventListener( "abort", - () => this.close(), + () => { + op_http_cancel(this.serverRid, false); + }, { once: true }, ); this.abortController = new AbortController(); @@ -630,6 +636,9 @@ function serveHttpOn(context, callback) { if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { break; } + if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) { + break; + } throw new Deno.errors.Http(error); } if (req === -1) { @@ -637,10 +646,24 @@ function serveHttpOn(context, callback) { } PromisePrototypeCatch(callback(req), promiseErrorHandler); } + + if (!context.closed && !context.closing) { + context.closed = true; + await op_http_close(rid, false); + context.close(); + } })(); return { finished, + async shutdown() { + if (!context.closed && !context.closing) { + // Shut this HTTP server down gracefully + context.closing = true; + await op_http_close(context.serverRid, true); + context.closed = true; + } + }, then() { throw new Error( "Deno.serve no longer returns a promise. await server.finished instead of server.", diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 476a55a80..94f6f1241 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,11 +10,13 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; +use crate::slab::http_trace; use crate::slab::slab_drop; use crate::slab::slab_get; use crate::slab::slab_init; use crate::slab::slab_insert; use crate::slab::HttpRequestBodyAutocloser; +use crate::slab::RefCount; use crate::slab::SlabId; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; @@ -70,6 +72,7 @@ use std::future::Future; use std::io; use std::pin::Pin; use std::rc::Rc; +use std::time::Duration; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -690,7 +693,10 @@ pub async fn op_http_track( .resource_table .get::<HttpJoinHandle>(server_rid)?; - match handle.or_cancel(join_handle.cancel_handle()).await { + match handle + .or_cancel(join_handle.connection_cancel_handle()) + .await + { Ok(true) => Ok(()), Ok(false) => { Err(AnyError::msg("connection closed before message completed")) @@ -705,14 +711,17 @@ pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F); pub fn new_slab_future( request: Request, request_info: HttpConnectionProperties, + refcount: RefCount, tx: tokio::sync::mpsc::Sender<SlabId>, ) -> SlabFuture<impl Future<Output = ()>> { - let index = slab_insert(request, request_info); + let index = slab_insert(request, request_info, refcount); let rx = slab_get(index).promise(); SlabFuture(index, async move { if tx.send(index).await.is_ok() { + http_trace!(index, "SlabFuture await"); // We only need to wait for completion if we aren't closed rx.await; + http_trace!(index, "SlabFuture complete"); } }) } @@ -745,45 +754,75 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> { fn serve_http11_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, -) -> impl Future<Output = Result<(), AnyError>> + 'static { + cancel: Rc<CancelHandle>, +) -> impl Future<Output = Result<(), hyper1::Error>> + 'static { let conn = http1::Builder::new() .keep_alive(true) .writev(*USE_WRITEV) - .serve_connection(TokioIo::new(io), svc); - - conn.with_upgrades().map_err(AnyError::from) + .serve_connection(TokioIo::new(io), svc) + .with_upgrades(); + + async { + match conn.or_abort(cancel).await { + Err(mut conn) => { + Pin::new(&mut conn).graceful_shutdown(); + conn.await + } + Ok(res) => res, + } + } } fn serve_http2_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, -) -> impl Future<Output = Result<(), AnyError>> + 'static { + cancel: Rc<CancelHandle>, +) -> impl Future<Output = Result<(), hyper1::Error>> + 'static { let conn = http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc); - conn.map_err(AnyError::from) + async { + match conn.or_abort(cancel).await { + Err(mut conn) => { + Pin::new(&mut conn).graceful_shutdown(); + conn.await + } + Ok(res) => res, + } + } } async fn serve_http2_autodetect( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, + cancel: Rc<CancelHandle>, ) -> Result<(), AnyError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let (matches, io) = prefix.match_prefix().await?; if matches { - serve_http2_unconditional(io, svc).await + serve_http2_unconditional(io, svc, cancel) + .await + .map_err(|e| e.into()) } else { - serve_http11_unconditional(io, svc).await + serve_http11_unconditional(io, svc, cancel) + .await + .map_err(|e| e.into()) } } fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, - cancel: Rc<CancelHandle>, + lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<SlabId>, ) -> JoinHandle<Result<(), AnyError>> { + let HttpLifetime { + refcount, + connection_cancel_handle, + listen_cancel_handle, + } = lifetime; + let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), tx.clone()) + new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) }); spawn( async { @@ -792,33 +831,46 @@ fn serve_https( // based on the prefix bytes let handshake = io.get_ref().1.alpn_protocol(); if handshake == Some(TLS_ALPN_HTTP_2) { - serve_http2_unconditional(io, svc).await + serve_http2_unconditional(io, svc, listen_cancel_handle) + .await + .map_err(|e| e.into()) } else if handshake == Some(TLS_ALPN_HTTP_11) { - serve_http11_unconditional(io, svc).await + serve_http11_unconditional(io, svc, listen_cancel_handle) + .await + .map_err(|e| e.into()) } else { - serve_http2_autodetect(io, svc).await + serve_http2_autodetect(io, svc, listen_cancel_handle).await } } - .try_or_cancel(cancel), + .try_or_cancel(connection_cancel_handle), ) } fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, - cancel: Rc<CancelHandle>, + lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<SlabId>, ) -> JoinHandle<Result<(), AnyError>> { + let HttpLifetime { + refcount, + connection_cancel_handle, + listen_cancel_handle, + } = lifetime; + let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), tx.clone()) + new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) }); - spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) + spawn( + serve_http2_autodetect(io, svc, listen_cancel_handle) + .try_or_cancel(connection_cancel_handle), + ) } fn serve_http_on<HTTP>( connection: HTTP::Connection, listen_properties: &HttpListenProperties, - cancel: Rc<CancelHandle>, + lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<SlabId>, ) -> JoinHandle<Result<(), AnyError>> where @@ -831,28 +883,58 @@ where match network_stream { NetworkStream::Tcp(conn) => { - serve_http(conn, connection_properties, cancel, tx) + serve_http(conn, connection_properties, lifetime, tx) } NetworkStream::Tls(conn) => { - serve_https(conn, connection_properties, cancel, tx) + serve_https(conn, connection_properties, lifetime, tx) } #[cfg(unix)] NetworkStream::Unix(conn) => { - serve_http(conn, connection_properties, cancel, tx) + serve_http(conn, connection_properties, lifetime, tx) } } } -struct HttpJoinHandle( - AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, - // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd - Rc<CancelHandle>, - AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>, -); +#[derive(Clone)] +struct HttpLifetime { + connection_cancel_handle: Rc<CancelHandle>, + listen_cancel_handle: Rc<CancelHandle>, + refcount: RefCount, +} + +struct HttpJoinHandle { + join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, + connection_cancel_handle: Rc<CancelHandle>, + listen_cancel_handle: Rc<CancelHandle>, + rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>, + refcount: RefCount, +} impl HttpJoinHandle { - fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> { - self.1.clone() + fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self { + Self { + join_handle: AsyncRefCell::new(None), + connection_cancel_handle: CancelHandle::new_rc(), + listen_cancel_handle: CancelHandle::new_rc(), + rx: AsyncRefCell::new(rx), + refcount: RefCount::default(), + } + } + + fn lifetime(self: &Rc<Self>) -> HttpLifetime { + HttpLifetime { + connection_cancel_handle: self.connection_cancel_handle.clone(), + listen_cancel_handle: self.listen_cancel_handle.clone(), + refcount: self.refcount.clone(), + } + } + + fn connection_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> { + self.connection_cancel_handle.clone() + } + + fn listen_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> { + self.listen_cancel_handle.clone() } } @@ -862,14 +944,17 @@ impl Resource for HttpJoinHandle { } fn close(self: Rc<Self>) { - self.1.cancel() + // During a close operation, we cancel everything + self.connection_cancel_handle.cancel(); + self.listen_cancel_handle.cancel(); } } impl Drop for HttpJoinHandle { fn drop(&mut self) { // In some cases we may be dropped without closing, so let's cancel everything on the way out - self.1.cancel(); + self.connection_cancel_handle.cancel(); + self.listen_cancel_handle.cancel(); } } @@ -890,23 +975,21 @@ where let listen_properties = HTTP::listen_properties_from_listener(&listener)?; let (tx, rx) = tokio::sync::mpsc::channel(10); - let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( - AsyncRefCell::new(None), - CancelHandle::new_rc(), - AsyncRefCell::new(rx), - )); - let cancel_clone = resource.cancel_handle(); + let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx)); + let listen_cancel_clone = resource.listen_cancel_handle(); + + let lifetime = resource.lifetime(); let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let handle = spawn(async move { loop { let conn = HTTP::accept_connection_from_listener(&listener) - .try_or_cancel(cancel_clone.clone()) + .try_or_cancel(listen_cancel_clone.clone()) .await?; serve_http_on::<HTTP>( conn, &listen_properties_clone, - cancel_clone.clone(), + lifetime.clone(), tx.clone(), ); } @@ -915,7 +998,7 @@ where }); // Set the handle after we start the future - *RcRef::map(&resource, |this| &this.0) + *RcRef::map(&resource, |this| &this.join_handle) .try_borrow_mut() .unwrap() = Some(handle); @@ -943,22 +1026,18 @@ where let listen_properties = HTTP::listen_properties_from_connection(&connection)?; let (tx, rx) = tokio::sync::mpsc::channel(10); - let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( - AsyncRefCell::new(None), - CancelHandle::new_rc(), - AsyncRefCell::new(rx), - )); + let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx)); let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> = serve_http_on::<HTTP>( connection, &listen_properties, - resource.cancel_handle(), + resource.lifetime(), tx, ); // Set the handle after we start the future - *RcRef::map(&resource, |this| &this.0) + *RcRef::map(&resource, |this| &this.join_handle) .try_borrow_mut() .unwrap() = Some(handle); @@ -981,7 +1060,7 @@ pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId { // If join handle is somehow locked, just abort. let Some(mut handle) = - RcRef::map(&join_handle, |this| &this.2).try_borrow_mut() + RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut() else { return SlabId::MAX; }; @@ -1006,9 +1085,9 @@ pub async fn op_http_wait( .resource_table .get::<HttpJoinHandle>(rid)?; - let cancel = join_handle.cancel_handle(); + let cancel = join_handle.listen_cancel_handle(); let next = async { - let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await; + let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await; recv.recv().await } .or_cancel(cancel) @@ -1021,19 +1100,13 @@ pub async fn op_http_wait( } // No - we're shutting down - let res = RcRef::map(join_handle, |this| &this.0) + let res = RcRef::map(join_handle, |this| &this.join_handle) .borrow_mut() .await .take() .unwrap() .await?; - // Drop the cancel and join handles - state - .borrow_mut() - .resource_table - .take::<HttpJoinHandle>(rid)?; - // Filter out shutdown (ENOTCONN) errors if let Err(err) = res { if let Some(err) = err.source() { @@ -1049,6 +1122,63 @@ pub async fn op_http_wait( Ok(SlabId::MAX) } +/// Cancels the HTTP handle. +#[op2(fast)] +pub fn op_http_cancel( + state: &mut OpState, + #[smi] rid: ResourceId, + graceful: bool, +) -> Result<(), AnyError> { + let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?; + + if graceful { + // In a graceful shutdown, we close the listener and allow all the remaining connections to drain + join_handle.listen_cancel_handle().cancel(); + } else { + // In a forceful shutdown, we close everything + join_handle.listen_cancel_handle().cancel(); + join_handle.connection_cancel_handle().cancel(); + } + + Ok(()) +} + +#[op2(async)] +pub async fn op_http_close( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, + graceful: bool, +) -> Result<(), AnyError> { + let join_handle = state + .borrow_mut() + .resource_table + .take::<HttpJoinHandle>(rid)?; + + if graceful { + deno_net::check_unstable2(&state, "Deno.Server.shutdown"); + // In a graceful shutdown, we close the listener and allow all the remaining connections to drain + join_handle.listen_cancel_handle().cancel(); + } else { + // In a forceful shutdown, we close everything + join_handle.listen_cancel_handle().cancel(); + join_handle.connection_cancel_handle().cancel(); + } + + // Async spin on the refcount while we wait for everything to drain + while Rc::strong_count(&join_handle.refcount.0) > 1 { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle) + .borrow_mut() + .await; + if let Some(join_handle) = join_handle.take() { + join_handle.await??; + } + + Ok(()) +} + struct UpgradeStream { read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>, write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>, diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 93ea0895e..719dcd6de 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -126,6 +126,8 @@ deno_core::extension!( http_next::op_can_write_vectored, http_next::op_http_try_wait, http_next::op_http_wait, + http_next::op_http_close, + http_next::op_http_cancel, ], esm = ["00_serve.js", "01_http.js"], ); diff --git a/ext/http/slab.rs b/ext/http/slab.rs index 8dd562cc2..4718aded1 100644 --- a/ext/http/slab.rs +++ b/ext/http/slab.rs @@ -20,6 +20,10 @@ pub type Request = hyper1::Request<Incoming>; pub type Response = hyper1::Response<ResponseBytes>; pub type SlabId = u32; +#[repr(transparent)] +#[derive(Clone, Default)] +pub struct RefCount(pub Rc<()>); + enum RequestBodyState { Incoming(Incoming), Resource(HttpRequestBodyAutocloser), @@ -50,24 +54,27 @@ pub struct HttpSlabRecord { request_info: HttpConnectionProperties, request_parts: Parts, request_body: Option<RequestBodyState>, - // The response may get taken before we tear this down + /// The response may get taken before we tear this down response: Option<Response>, promise: CompletionHandle, trailers: Rc<RefCell<Option<HeaderMap>>>, been_dropped: bool, + /// Use a `Rc` to keep track of outstanding requests. We don't use this, but + /// when it drops, it decrements the refcount of the server itself. + refcount: Option<RefCount>, #[cfg(feature = "__zombie_http_tracking")] alive: bool, } thread_local! { - static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) }; + pub(crate) static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) }; } macro_rules! http_trace { ($index:expr, $args:tt) => { #[cfg(feature = "__http_tracing")] { - let total = SLAB.with(|x| x.try_borrow().map(|x| x.len())); + let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len())); if let Ok(total) = total { println!("HTTP id={} total={}: {}", $index, total, format!($args)); } else { @@ -77,6 +84,8 @@ macro_rules! http_trace { }; } +pub(crate) use http_trace; + /// Hold a lock on the slab table and a reference to one entry in the table. pub struct SlabEntry( NonNull<HttpSlabRecord>, @@ -121,6 +130,7 @@ fn slab_insert_raw( request_parts: Parts, request_body: Option<Incoming>, request_info: HttpConnectionProperties, + refcount: RefCount, ) -> SlabId { let index = SLAB.with(|slab| { let mut slab = slab.borrow_mut(); @@ -135,6 +145,7 @@ fn slab_insert_raw( trailers, been_dropped: false, promise: CompletionHandle::default(), + refcount: Some(refcount), #[cfg(feature = "__zombie_http_tracking")] alive: true, }) @@ -146,9 +157,10 @@ fn slab_insert_raw( pub fn slab_insert( request: Request, request_info: HttpConnectionProperties, + refcount: RefCount, ) -> SlabId { let (request_parts, request_body) = request.into_parts(); - slab_insert_raw(request_parts, Some(request_body), request_info) + slab_insert_raw(request_parts, Some(request_body), request_info, refcount) } pub fn slab_drop(index: SlabId) { @@ -159,10 +171,21 @@ pub fn slab_drop(index: SlabId) { !record.been_dropped, "HTTP state error: Entry has already been dropped" ); + + // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND + // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished + // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which + // might include actual resources, and the refcount, which is keeping the server alive. record.been_dropped = true; if record.promise.is_completed() { drop(entry); slab_expunge(index); + } else { + // Take the request body, as the future has been dropped and this will allow some resources to close + record.request_body.take(); + // Take the refcount keeping the server alive. The future is no longer alive, which means this request + // is toast. + record.refcount.take(); } } @@ -318,6 +341,7 @@ mod tests { local_port: None, stream_type: NetworkStreamType::Tcp, }, + RefCount::default(), ); let entry = slab_get(id); entry.complete(); |