summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/http/00_serve.js27
-rw-r--r--ext/http/http_next.rs248
-rw-r--r--ext/http/lib.rs2
-rw-r--r--ext/http/slab.rs32
4 files changed, 244 insertions, 65 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 914205889..aeebca93d 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -4,7 +4,7 @@ const core = globalThis.Deno.core;
const primordials = globalThis.__bootstrap.primordials;
const internals = globalThis.__bootstrap.internals;
-const { BadResourcePrototype } = core;
+const { BadResourcePrototype, InterruptedPrototype } = core;
import { InnerBody } from "ext:deno_fetch/22_body.js";
import { Event } from "ext:deno_web/02_event.js";
import {
@@ -65,6 +65,8 @@ const {
op_http_upgrade_websocket_next,
op_http_try_wait,
op_http_wait,
+ op_http_cancel,
+ op_http_close,
} = core.ensureFastOps();
const _upgraded = Symbol("_upgraded");
@@ -334,11 +336,15 @@ class CallbackContext {
fallbackHost;
serverRid;
closed;
+ closing;
constructor(signal, args) {
+ // The abort signal triggers a non-graceful shutdown
signal?.addEventListener(
"abort",
- () => this.close(),
+ () => {
+ op_http_cancel(this.serverRid, false);
+ },
{ once: true },
);
this.abortController = new AbortController();
@@ -630,6 +636,9 @@ function serveHttpOn(context, callback) {
if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) {
break;
}
+ if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) {
+ break;
+ }
throw new Deno.errors.Http(error);
}
if (req === -1) {
@@ -637,10 +646,24 @@ function serveHttpOn(context, callback) {
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
+
+ if (!context.closed && !context.closing) {
+ context.closed = true;
+ await op_http_close(rid, false);
+ context.close();
+ }
})();
return {
finished,
+ async shutdown() {
+ if (!context.closed && !context.closing) {
+ // Shut this HTTP server down gracefully
+ context.closing = true;
+ await op_http_close(context.serverRid, true);
+ context.closed = true;
+ }
+ },
then() {
throw new Error(
"Deno.serve no longer returns a promise. await server.finished instead of server.",
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 476a55a80..94f6f1241 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,11 +10,13 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
+use crate::slab::http_trace;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
use crate::slab::slab_insert;
use crate::slab::HttpRequestBodyAutocloser;
+use crate::slab::RefCount;
use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
@@ -70,6 +72,7 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
+use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -690,7 +693,10 @@ pub async fn op_http_track(
.resource_table
.get::<HttpJoinHandle>(server_rid)?;
- match handle.or_cancel(join_handle.cancel_handle()).await {
+ match handle
+ .or_cancel(join_handle.connection_cancel_handle())
+ .await
+ {
Ok(true) => Ok(()),
Ok(false) => {
Err(AnyError::msg("connection closed before message completed"))
@@ -705,14 +711,17 @@ pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> SlabFuture<impl Future<Output = ()>> {
- let index = slab_insert(request, request_info);
+ let index = slab_insert(request, request_info, refcount);
let rx = slab_get(index).promise();
SlabFuture(index, async move {
if tx.send(index).await.is_ok() {
+ http_trace!(index, "SlabFuture await");
// We only need to wait for completion if we aren't closed
rx.await;
+ http_trace!(index, "SlabFuture complete");
}
})
}
@@ -745,45 +754,75 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
.keep_alive(true)
.writev(*USE_WRITEV)
- .serve_connection(TokioIo::new(io), svc);
-
- conn.with_upgrades().map_err(AnyError::from)
+ .serve_connection(TokioIo::new(io), svc)
+ .with_upgrades();
+
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
- conn.map_err(AnyError::from)
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
@@ -792,33 +831,46 @@ fn serve_https(
// based on the prefix bytes
let handshake = io.get_ref().1.alpn_protocol();
if handshake == Some(TLS_ALPN_HTTP_2) {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else if handshake == Some(TLS_ALPN_HTTP_11) {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http2_autodetect(io, svc).await
+ serve_http2_autodetect(io, svc, listen_cancel_handle).await
}
}
- .try_or_cancel(cancel),
+ .try_or_cancel(connection_cancel_handle),
)
}
fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
- spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
+ spawn(
+ serve_http2_autodetect(io, svc, listen_cancel_handle)
+ .try_or_cancel(connection_cancel_handle),
+ )
}
fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>>
where
@@ -831,28 +883,58 @@ where
match network_stream {
NetworkStream::Tcp(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
NetworkStream::Tls(conn) => {
- serve_https(conn, connection_properties, cancel, tx)
+ serve_https(conn, connection_properties, lifetime, tx)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
}
}
-struct HttpJoinHandle(
- AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
- // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
- Rc<CancelHandle>,
- AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
-);
+#[derive(Clone)]
+struct HttpLifetime {
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ refcount: RefCount,
+}
+
+struct HttpJoinHandle {
+ join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
+ refcount: RefCount,
+}
impl HttpJoinHandle {
- fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
- self.1.clone()
+ fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
+ Self {
+ join_handle: AsyncRefCell::new(None),
+ connection_cancel_handle: CancelHandle::new_rc(),
+ listen_cancel_handle: CancelHandle::new_rc(),
+ rx: AsyncRefCell::new(rx),
+ refcount: RefCount::default(),
+ }
+ }
+
+ fn lifetime(self: &Rc<Self>) -> HttpLifetime {
+ HttpLifetime {
+ connection_cancel_handle: self.connection_cancel_handle.clone(),
+ listen_cancel_handle: self.listen_cancel_handle.clone(),
+ refcount: self.refcount.clone(),
+ }
+ }
+
+ fn connection_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.connection_cancel_handle.clone()
+ }
+
+ fn listen_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.listen_cancel_handle.clone()
}
}
@@ -862,14 +944,17 @@ impl Resource for HttpJoinHandle {
}
fn close(self: Rc<Self>) {
- self.1.cancel()
+ // During a close operation, we cancel everything
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
impl Drop for HttpJoinHandle {
fn drop(&mut self) {
// In some cases we may be dropped without closing, so let's cancel everything on the way out
- self.1.cancel();
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
@@ -890,23 +975,21 @@ where
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
- let cancel_clone = resource.cancel_handle();
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
+ let listen_cancel_clone = resource.listen_cancel_handle();
+
+ let lifetime = resource.lifetime();
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
- .try_or_cancel(cancel_clone.clone())
+ .try_or_cancel(listen_cancel_clone.clone())
.await?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
- cancel_clone.clone(),
+ lifetime.clone(),
tx.clone(),
);
}
@@ -915,7 +998,7 @@ where
});
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -943,22 +1026,18 @@ where
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>(
connection,
&listen_properties,
- resource.cancel_handle(),
+ resource.lifetime(),
tx,
);
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -981,7 +1060,7 @@ pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
// If join handle is somehow locked, just abort.
let Some(mut handle) =
- RcRef::map(&join_handle, |this| &this.2).try_borrow_mut()
+ RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
return SlabId::MAX;
};
@@ -1006,9 +1085,9 @@ pub async fn op_http_wait(
.resource_table
.get::<HttpJoinHandle>(rid)?;
- let cancel = join_handle.cancel_handle();
+ let cancel = join_handle.listen_cancel_handle();
let next = async {
- let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
+ let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await;
recv.recv().await
}
.or_cancel(cancel)
@@ -1021,19 +1100,13 @@ pub async fn op_http_wait(
}
// No - we're shutting down
- let res = RcRef::map(join_handle, |this| &this.0)
+ let res = RcRef::map(join_handle, |this| &this.join_handle)
.borrow_mut()
.await
.take()
.unwrap()
.await?;
- // Drop the cancel and join handles
- state
- .borrow_mut()
- .resource_table
- .take::<HttpJoinHandle>(rid)?;
-
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
if let Some(err) = err.source() {
@@ -1049,6 +1122,63 @@ pub async fn op_http_wait(
Ok(SlabId::MAX)
}
+/// Cancels the HTTP handle.
+#[op2(fast)]
+pub fn op_http_cancel(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ Ok(())
+}
+
+#[op2(async)]
+pub async fn op_http_close(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state
+ .borrow_mut()
+ .resource_table
+ .take::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ deno_net::check_unstable2(&state, "Deno.Server.shutdown");
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ // Async spin on the refcount while we wait for everything to drain
+ while Rc::strong_count(&join_handle.refcount.0) > 1 {
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+
+ let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
+ .borrow_mut()
+ .await;
+ if let Some(join_handle) = join_handle.take() {
+ join_handle.await??;
+ }
+
+ Ok(())
+}
+
struct UpgradeStream {
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 93ea0895e..719dcd6de 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -126,6 +126,8 @@ deno_core::extension!(
http_next::op_can_write_vectored,
http_next::op_http_try_wait,
http_next::op_http_wait,
+ http_next::op_http_close,
+ http_next::op_http_cancel,
],
esm = ["00_serve.js", "01_http.js"],
);
diff --git a/ext/http/slab.rs b/ext/http/slab.rs
index 8dd562cc2..4718aded1 100644
--- a/ext/http/slab.rs
+++ b/ext/http/slab.rs
@@ -20,6 +20,10 @@ pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type SlabId = u32;
+#[repr(transparent)]
+#[derive(Clone, Default)]
+pub struct RefCount(pub Rc<()>);
+
enum RequestBodyState {
Incoming(Incoming),
Resource(HttpRequestBodyAutocloser),
@@ -50,24 +54,27 @@ pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<RequestBodyState>,
- // The response may get taken before we tear this down
+ /// The response may get taken before we tear this down
response: Option<Response>,
promise: CompletionHandle,
trailers: Rc<RefCell<Option<HeaderMap>>>,
been_dropped: bool,
+ /// Use a `Rc` to keep track of outstanding requests. We don't use this, but
+ /// when it drops, it decrements the refcount of the server itself.
+ refcount: Option<RefCount>,
#[cfg(feature = "__zombie_http_tracking")]
alive: bool,
}
thread_local! {
- static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
+ pub(crate) static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
}
macro_rules! http_trace {
($index:expr, $args:tt) => {
#[cfg(feature = "__http_tracing")]
{
- let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
+ let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len()));
if let Ok(total) = total {
println!("HTTP id={} total={}: {}", $index, total, format!($args));
} else {
@@ -77,6 +84,8 @@ macro_rules! http_trace {
};
}
+pub(crate) use http_trace;
+
/// Hold a lock on the slab table and a reference to one entry in the table.
pub struct SlabEntry(
NonNull<HttpSlabRecord>,
@@ -121,6 +130,7 @@ fn slab_insert_raw(
request_parts: Parts,
request_body: Option<Incoming>,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
) -> SlabId {
let index = SLAB.with(|slab| {
let mut slab = slab.borrow_mut();
@@ -135,6 +145,7 @@ fn slab_insert_raw(
trailers,
been_dropped: false,
promise: CompletionHandle::default(),
+ refcount: Some(refcount),
#[cfg(feature = "__zombie_http_tracking")]
alive: true,
})
@@ -146,9 +157,10 @@ fn slab_insert_raw(
pub fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
) -> SlabId {
let (request_parts, request_body) = request.into_parts();
- slab_insert_raw(request_parts, Some(request_body), request_info)
+ slab_insert_raw(request_parts, Some(request_body), request_info, refcount)
}
pub fn slab_drop(index: SlabId) {
@@ -159,10 +171,21 @@ pub fn slab_drop(index: SlabId) {
!record.been_dropped,
"HTTP state error: Entry has already been dropped"
);
+
+ // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND
+ // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished
+ // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which
+ // might include actual resources, and the refcount, which is keeping the server alive.
record.been_dropped = true;
if record.promise.is_completed() {
drop(entry);
slab_expunge(index);
+ } else {
+ // Take the request body, as the future has been dropped and this will allow some resources to close
+ record.request_body.take();
+ // Take the refcount keeping the server alive. The future is no longer alive, which means this request
+ // is toast.
+ record.refcount.take();
}
}
@@ -318,6 +341,7 @@ mod tests {
local_port: None,
stream_type: NetworkStreamType::Tcp,
},
+ RefCount::default(),
);
let entry = slab_get(id);
entry.complete();