summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock13
-rw-r--r--Cargo.toml3
-rw-r--r--cli/Cargo.toml1
-rw-r--r--cli/tests/unit/serve_test.ts244
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts11
-rw-r--r--ext/http/00_serve.js27
-rw-r--r--ext/http/http_next.rs248
-rw-r--r--ext/http/lib.rs2
-rw-r--r--ext/http/slab.rs32
9 files changed, 500 insertions, 81 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e199ec6f7..ca0f04048 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -922,7 +922,6 @@ dependencies = [
"tokio",
"tokio-util",
"tower-lsp",
- "tracing",
"trust-dns-client",
"trust-dns-server",
"twox-hash",
@@ -1075,9 +1074,9 @@ dependencies = [
[[package]]
name = "deno_core"
-version = "0.208.0"
+version = "0.209.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aab2b013707b6a1bb1e56404b72a4f68220d0fbe1184133b2b21386a8ffbc5d8"
+checksum = "c48ff1f83aeeda4b8ed9c101b85380fd2f25a52268130546c610c8e412911d7b"
dependencies = [
"anyhow",
"bytes",
@@ -1459,9 +1458,9 @@ dependencies = [
[[package]]
name = "deno_ops"
-version = "0.86.0"
+version = "0.87.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b116802ace73e3dd910081652789c85aa21f057b9f5936255d786965816fb3b1"
+checksum = "573a5ae66f76ce159525ab9007433e19d1a074e32c27b17a4753780d659d79fa"
dependencies = [
"deno-proc-macro-rules",
"lazy-regex 2.5.0",
@@ -4619,9 +4618,9 @@ dependencies = [
[[package]]
name = "serde_v8"
-version = "0.119.0"
+version = "0.120.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "85efce3bb967c7cd2be8058f7b06047489e0b0888fc25db9e3aa7907370ae45c"
+checksum = "a5424b4b41a92222abf9ddbdd78f59164f7594422ee4a61fc3704fc8ba608dc6"
dependencies = [
"bytes",
"derive_more",
diff --git a/Cargo.toml b/Cargo.toml
index 1bfab9fa1..c5707de53 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,7 +40,7 @@ repository = "https://github.com/denoland/deno"
[workspace.dependencies]
deno_ast = { version = "0.29.1", features = ["transpiling"] }
-deno_core = { version = "0.208.0" }
+deno_core = { version = "0.209.0" }
deno_runtime = { version = "0.126.0", path = "./runtime" }
napi_sym = { version = "0.48.0", path = "./cli/napi/sym" }
@@ -139,7 +139,6 @@ tar = "=0.4.40"
tempfile = "3.4.0"
termcolor = "1.1.3"
thiserror = "1.0.40"
-tracing = "0"
tokio = { version = "1.28.1", features = ["full"] }
tokio-metrics = { version = "0.3.0", features = ["rt"] }
tokio-rustls = "0.24.0"
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index c08e66180..42f868ef9 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -116,7 +116,6 @@ thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tower-lsp.workspace = true
-tracing.workspace = true
twox-hash = "=1.6.3"
typed-arena = "=2.0.1"
uuid = { workspace = true, features = ["serde"] }
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index d1ac82696..3f58903a8 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -41,27 +41,257 @@ function onListen<T>(
};
}
-Deno.test(async function httpServerShutsDownPortBeforeResolving() {
+async function makeServer(
+ handler: (req: Request) => Response | Promise<Response>,
+): Promise<
+ { finished: Promise<void>; abort: () => void; shutdown: () => Promise<void> }
+> {
const ac = new AbortController();
const listeningPromise = deferred();
const server = Deno.serve({
- handler: (_req) => new Response("ok"),
+ handler,
port: servePort,
signal: ac.signal,
onListen: onListen(listeningPromise),
});
await listeningPromise;
- assertThrows(() => Deno.listen({ port: servePort }));
+ return {
+ finished: server.finished,
+ abort() {
+ ac.abort();
+ },
+ async shutdown() {
+ await server.shutdown();
+ },
+ };
+}
- ac.abort();
- await server.finished;
+Deno.test(async function httpServerShutsDownPortBeforeResolving() {
+ const { finished, abort } = await makeServer((_req) => new Response("ok"));
+ assertThrows(() => Deno.listen({ port: servePort }));
+ abort();
+ await finished;
const listener = Deno.listen({ port: servePort });
listener!.close();
});
+// When shutting down abruptly, we require that all in-progress connections are aborted,
+// no new connections are allowed, and no new transactions are allowed on existing connections.
+Deno.test(
+ { permissions: { net: true } },
+ async function httpServerShutdownAbruptGuaranteeHttp11() {
+ const promiseQueue: { input: Deferred<string>; out: Deferred<void> }[] = [];
+ const { finished, abort } = await makeServer((_req) => {
+ const { input, out } = promiseQueue.shift()!;
+ return new Response(
+ new ReadableStream({
+ async start(controller) {
+ controller.enqueue(new Uint8Array([46]));
+ out.resolve(undefined);
+ controller.enqueue(encoder.encode(await input));
+ controller.close();
+ },
+ }),
+ );
+ });
+ const encoder = new TextEncoder();
+ const decoder = new TextDecoder();
+ const conn = await Deno.connect({ port: servePort });
+ const w = conn.writable.getWriter();
+ const r = conn.readable.getReader();
+
+ const deferred1 = { input: deferred<string>(), out: deferred<void>() };
+ promiseQueue.push(deferred1);
+ const deferred2 = { input: deferred<string>(), out: deferred<void>() };
+ promiseQueue.push(deferred2);
+ const deferred3 = { input: deferred<string>(), out: deferred<void>() };
+ promiseQueue.push(deferred3);
+ deferred1.input.resolve("#");
+ deferred2.input.resolve("$");
+ await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
+ await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
+
+ // Fully read two responses
+ let text = "";
+ while (!text.includes("$\r\n")) {
+ text += decoder.decode((await r.read()).value);
+ }
+
+ await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
+ await deferred3.out;
+
+ // This is half served, so wait for the chunk that has the first '.'
+ text = "";
+ while (!text.includes("1\r\n.\r\n")) {
+ text += decoder.decode((await r.read()).value);
+ }
+
+ abort();
+
+ // This doesn't actually write anything, but we release it after aborting
+ deferred3.input.resolve("!");
+
+ // Guarantee: can't connect to an aborted server (though this may not happen immediately)
+ let failed = false;
+ for (let i = 0; i < 10; i++) {
+ try {
+ const conn = await Deno.connect({ port: servePort });
+ conn.close();
+ // Give the runtime a few ticks to settle (required for Windows)
+ await new Promise((r) => setTimeout(r, 2 ** i));
+ continue;
+ } catch (_) {
+ failed = true;
+ break;
+ }
+ }
+ assert(failed, "The Deno.serve listener was not disabled promptly");
+
+ // Guarantee: the pipeline is closed abruptly
+ assert((await r.read()).done);
+
+ try {
+ conn.close();
+ } catch (_) {
+ // Ignore
+ }
+ await finished;
+ },
+);
+
+// When shutting down abruptly, we require that all in-progress connections are aborted,
+// no new connections are allowed, and no new transactions are allowed on existing connections.
+Deno.test(
+ { permissions: { net: true } },
+ async function httpServerShutdownGracefulGuaranteeHttp11() {
+ const promiseQueue: { input: Deferred<string>; out: Deferred<void> }[] = [];
+ const { finished, shutdown } = await makeServer((_req) => {
+ const { input, out } = promiseQueue.shift()!;
+ return new Response(
+ new ReadableStream({
+ async start(controller) {
+ controller.enqueue(new Uint8Array([46]));
+ out.resolve(undefined);
+ controller.enqueue(encoder.encode(await input));
+ controller.close();
+ },
+ }),
+ );
+ });
+ const encoder = new TextEncoder();
+ const decoder = new TextDecoder();
+ const conn = await Deno.connect({ port: servePort });
+ const w = conn.writable.getWriter();
+ const r = conn.readable.getReader();
+
+ const deferred1 = { input: deferred<string>(), out: deferred<void>() };
+ promiseQueue.push(deferred1);
+ const deferred2 = { input: deferred<string>(), out: deferred<void>() };
+ promiseQueue.push(deferred2);
+ const deferred3 = { input: deferred<string>(), out: deferred<void>() };
+ promiseQueue.push(deferred3);
+ deferred1.input.resolve("#");
+ deferred2.input.resolve("$");
+ await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
+ await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
+
+ // Fully read two responses
+ let text = "";
+ while (!text.includes("$\r\n")) {
+ text += decoder.decode((await r.read()).value);
+ }
+
+ await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
+ await deferred3.out;
+
+ // This is half served, so wait for the chunk that has the first '.'
+ text = "";
+ while (!text.includes("1\r\n.\r\n")) {
+ text += decoder.decode((await r.read()).value);
+ }
+
+ const shutdownPromise = shutdown();
+
+ // Release the final response _after_ we shut down
+ deferred3.input.resolve("!");
+
+ // Guarantee: can't connect to an aborted server (though this may not happen immediately)
+ let failed = false;
+ for (let i = 0; i < 10; i++) {
+ try {
+ const conn = await Deno.connect({ port: servePort });
+ conn.close();
+ // Give the runtime a few ticks to settle (required for Windows)
+ await new Promise((r) => setTimeout(r, 2 ** i));
+ continue;
+ } catch (_) {
+ failed = true;
+ break;
+ }
+ }
+ assert(failed, "The Deno.serve listener was not disabled promptly");
+
+ // Guarantee: existing connections fully drain
+ while (!text.includes("!\r\n")) {
+ text += decoder.decode((await r.read()).value);
+ }
+
+ await shutdownPromise;
+
+ try {
+ conn.close();
+ } catch (_) {
+ // Ignore
+ }
+ await finished;
+ },
+);
+
+// Ensure that resources don't leak during a graceful shutdown
+Deno.test(
+ { permissions: { net: true, write: true, read: true } },
+ async function httpServerShutdownGracefulResources() {
+ const waitForRequest = deferred();
+ const { finished, shutdown } = await makeServer(async (_req) => {
+ waitForRequest.resolve(null);
+ await new Promise((r) => setTimeout(r, 10));
+ return new Response((await makeTempFile(1024 * 1024)).readable);
+ });
+
+ const f = fetch(`http://localhost:${servePort}`);
+ await waitForRequest;
+ assertEquals((await (await f).text()).length, 1048576);
+ await shutdown();
+ await finished;
+ },
+);
+
+// Ensure that resources don't leak during a graceful shutdown
+Deno.test(
+ { permissions: { net: true, write: true, read: true } },
+ async function httpServerShutdownGracefulResources2() {
+ const waitForAbort = deferred();
+ const waitForRequest = deferred();
+ const { finished, shutdown } = await makeServer(async (_req) => {
+ waitForRequest.resolve(null);
+ await waitForAbort;
+ await new Promise((r) => setTimeout(r, 10));
+ return new Response((await makeTempFile(1024 * 1024)).readable);
+ });
+
+ const f = fetch(`http://localhost:${servePort}`);
+ await waitForRequest;
+ const s = shutdown();
+ waitForAbort.resolve(null);
+ assertEquals((await (await f).text()).length, 1048576);
+ await s;
+ await finished;
+ },
+);
+
Deno.test(
{ permissions: { read: true, run: true } },
async function httpServerUnref() {
@@ -2459,7 +2689,9 @@ for (const url of ["text", "file", "stream"]) {
// Give it a few milliseconds for the serve machinery to work
await new Promise((r) => setTimeout(r, 10));
- ac.abort();
+ // Since the handler has a chance of creating resources or running async ops, we need to use a
+ // graceful shutdown here to ensure they have fully drained.
+ await server.shutdown();
await server.finished;
},
});
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index 6a6d6c836..c8b857dc6 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -1936,6 +1936,17 @@ declare namespace Deno {
/** The value of this unsigned 64-bit integer, represented as a bigint. */
readonly value: bigint;
}
+
+ /** An instance of the server created using `Deno.serve()` API.
+ *
+ * @category HTTP Server
+ */
+ export interface Server {
+ /** Gracefully close the server. No more new connections will be accepted,
+ * while pending requests will be allowed to finish.
+ */
+ shutdown(): Promise<void>;
+ }
}
/** **UNSTABLE**: New API, yet to be vetted.
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 914205889..aeebca93d 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -4,7 +4,7 @@ const core = globalThis.Deno.core;
const primordials = globalThis.__bootstrap.primordials;
const internals = globalThis.__bootstrap.internals;
-const { BadResourcePrototype } = core;
+const { BadResourcePrototype, InterruptedPrototype } = core;
import { InnerBody } from "ext:deno_fetch/22_body.js";
import { Event } from "ext:deno_web/02_event.js";
import {
@@ -65,6 +65,8 @@ const {
op_http_upgrade_websocket_next,
op_http_try_wait,
op_http_wait,
+ op_http_cancel,
+ op_http_close,
} = core.ensureFastOps();
const _upgraded = Symbol("_upgraded");
@@ -334,11 +336,15 @@ class CallbackContext {
fallbackHost;
serverRid;
closed;
+ closing;
constructor(signal, args) {
+ // The abort signal triggers a non-graceful shutdown
signal?.addEventListener(
"abort",
- () => this.close(),
+ () => {
+ op_http_cancel(this.serverRid, false);
+ },
{ once: true },
);
this.abortController = new AbortController();
@@ -630,6 +636,9 @@ function serveHttpOn(context, callback) {
if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) {
break;
}
+ if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) {
+ break;
+ }
throw new Deno.errors.Http(error);
}
if (req === -1) {
@@ -637,10 +646,24 @@ function serveHttpOn(context, callback) {
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
+
+ if (!context.closed && !context.closing) {
+ context.closed = true;
+ await op_http_close(rid, false);
+ context.close();
+ }
})();
return {
finished,
+ async shutdown() {
+ if (!context.closed && !context.closing) {
+ // Shut this HTTP server down gracefully
+ context.closing = true;
+ await op_http_close(context.serverRid, true);
+ context.closed = true;
+ }
+ },
then() {
throw new Error(
"Deno.serve no longer returns a promise. await server.finished instead of server.",
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 476a55a80..94f6f1241 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,11 +10,13 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
+use crate::slab::http_trace;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
use crate::slab::slab_insert;
use crate::slab::HttpRequestBodyAutocloser;
+use crate::slab::RefCount;
use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
@@ -70,6 +72,7 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
+use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -690,7 +693,10 @@ pub async fn op_http_track(
.resource_table
.get::<HttpJoinHandle>(server_rid)?;
- match handle.or_cancel(join_handle.cancel_handle()).await {
+ match handle
+ .or_cancel(join_handle.connection_cancel_handle())
+ .await
+ {
Ok(true) => Ok(()),
Ok(false) => {
Err(AnyError::msg("connection closed before message completed"))
@@ -705,14 +711,17 @@ pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> SlabFuture<impl Future<Output = ()>> {
- let index = slab_insert(request, request_info);
+ let index = slab_insert(request, request_info, refcount);
let rx = slab_get(index).promise();
SlabFuture(index, async move {
if tx.send(index).await.is_ok() {
+ http_trace!(index, "SlabFuture await");
// We only need to wait for completion if we aren't closed
rx.await;
+ http_trace!(index, "SlabFuture complete");
}
})
}
@@ -745,45 +754,75 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
.keep_alive(true)
.writev(*USE_WRITEV)
- .serve_connection(TokioIo::new(io), svc);
-
- conn.with_upgrades().map_err(AnyError::from)
+ .serve_connection(TokioIo::new(io), svc)
+ .with_upgrades();
+
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
- conn.map_err(AnyError::from)
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
@@ -792,33 +831,46 @@ fn serve_https(
// based on the prefix bytes
let handshake = io.get_ref().1.alpn_protocol();
if handshake == Some(TLS_ALPN_HTTP_2) {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else if handshake == Some(TLS_ALPN_HTTP_11) {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http2_autodetect(io, svc).await
+ serve_http2_autodetect(io, svc, listen_cancel_handle).await
}
}
- .try_or_cancel(cancel),
+ .try_or_cancel(connection_cancel_handle),
)
}
fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
- spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
+ spawn(
+ serve_http2_autodetect(io, svc, listen_cancel_handle)
+ .try_or_cancel(connection_cancel_handle),
+ )
}
fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>>
where
@@ -831,28 +883,58 @@ where
match network_stream {
NetworkStream::Tcp(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
NetworkStream::Tls(conn) => {
- serve_https(conn, connection_properties, cancel, tx)
+ serve_https(conn, connection_properties, lifetime, tx)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
}
}
-struct HttpJoinHandle(
- AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
- // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
- Rc<CancelHandle>,
- AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
-);
+#[derive(Clone)]
+struct HttpLifetime {
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ refcount: RefCount,
+}
+
+struct HttpJoinHandle {
+ join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
+ refcount: RefCount,
+}
impl HttpJoinHandle {
- fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
- self.1.clone()
+ fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
+ Self {
+ join_handle: AsyncRefCell::new(None),
+ connection_cancel_handle: CancelHandle::new_rc(),
+ listen_cancel_handle: CancelHandle::new_rc(),
+ rx: AsyncRefCell::new(rx),
+ refcount: RefCount::default(),
+ }
+ }
+
+ fn lifetime(self: &Rc<Self>) -> HttpLifetime {
+ HttpLifetime {
+ connection_cancel_handle: self.connection_cancel_handle.clone(),
+ listen_cancel_handle: self.listen_cancel_handle.clone(),
+ refcount: self.refcount.clone(),
+ }
+ }
+
+ fn connection_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.connection_cancel_handle.clone()
+ }
+
+ fn listen_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.listen_cancel_handle.clone()
}
}
@@ -862,14 +944,17 @@ impl Resource for HttpJoinHandle {
}
fn close(self: Rc<Self>) {
- self.1.cancel()
+ // During a close operation, we cancel everything
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
impl Drop for HttpJoinHandle {
fn drop(&mut self) {
// In some cases we may be dropped without closing, so let's cancel everything on the way out
- self.1.cancel();
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
@@ -890,23 +975,21 @@ where
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
- let cancel_clone = resource.cancel_handle();
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
+ let listen_cancel_clone = resource.listen_cancel_handle();
+
+ let lifetime = resource.lifetime();
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
- .try_or_cancel(cancel_clone.clone())
+ .try_or_cancel(listen_cancel_clone.clone())
.await?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
- cancel_clone.clone(),
+ lifetime.clone(),
tx.clone(),
);
}
@@ -915,7 +998,7 @@ where
});
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -943,22 +1026,18 @@ where
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>(
connection,
&listen_properties,
- resource.cancel_handle(),
+ resource.lifetime(),
tx,
);
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -981,7 +1060,7 @@ pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
// If join handle is somehow locked, just abort.
let Some(mut handle) =
- RcRef::map(&join_handle, |this| &this.2).try_borrow_mut()
+ RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
return SlabId::MAX;
};
@@ -1006,9 +1085,9 @@ pub async fn op_http_wait(
.resource_table
.get::<HttpJoinHandle>(rid)?;
- let cancel = join_handle.cancel_handle();
+ let cancel = join_handle.listen_cancel_handle();
let next = async {
- let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
+ let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await;
recv.recv().await
}
.or_cancel(cancel)
@@ -1021,19 +1100,13 @@ pub async fn op_http_wait(
}
// No - we're shutting down
- let res = RcRef::map(join_handle, |this| &this.0)
+ let res = RcRef::map(join_handle, |this| &this.join_handle)
.borrow_mut()
.await
.take()
.unwrap()
.await?;
- // Drop the cancel and join handles
- state
- .borrow_mut()
- .resource_table
- .take::<HttpJoinHandle>(rid)?;
-
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
if let Some(err) = err.source() {
@@ -1049,6 +1122,63 @@ pub async fn op_http_wait(
Ok(SlabId::MAX)
}
+/// Cancels the HTTP handle.
+#[op2(fast)]
+pub fn op_http_cancel(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ Ok(())
+}
+
+#[op2(async)]
+pub async fn op_http_close(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state
+ .borrow_mut()
+ .resource_table
+ .take::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ deno_net::check_unstable2(&state, "Deno.Server.shutdown");
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ // Async spin on the refcount while we wait for everything to drain
+ while Rc::strong_count(&join_handle.refcount.0) > 1 {
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+
+ let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
+ .borrow_mut()
+ .await;
+ if let Some(join_handle) = join_handle.take() {
+ join_handle.await??;
+ }
+
+ Ok(())
+}
+
struct UpgradeStream {
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 93ea0895e..719dcd6de 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -126,6 +126,8 @@ deno_core::extension!(
http_next::op_can_write_vectored,
http_next::op_http_try_wait,
http_next::op_http_wait,
+ http_next::op_http_close,
+ http_next::op_http_cancel,
],
esm = ["00_serve.js", "01_http.js"],
);
diff --git a/ext/http/slab.rs b/ext/http/slab.rs
index 8dd562cc2..4718aded1 100644
--- a/ext/http/slab.rs
+++ b/ext/http/slab.rs
@@ -20,6 +20,10 @@ pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type SlabId = u32;
+#[repr(transparent)]
+#[derive(Clone, Default)]
+pub struct RefCount(pub Rc<()>);
+
enum RequestBodyState {
Incoming(Incoming),
Resource(HttpRequestBodyAutocloser),
@@ -50,24 +54,27 @@ pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<RequestBodyState>,
- // The response may get taken before we tear this down
+ /// The response may get taken before we tear this down
response: Option<Response>,
promise: CompletionHandle,
trailers: Rc<RefCell<Option<HeaderMap>>>,
been_dropped: bool,
+ /// Use a `Rc` to keep track of outstanding requests. We don't use this, but
+ /// when it drops, it decrements the refcount of the server itself.
+ refcount: Option<RefCount>,
#[cfg(feature = "__zombie_http_tracking")]
alive: bool,
}
thread_local! {
- static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
+ pub(crate) static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
}
macro_rules! http_trace {
($index:expr, $args:tt) => {
#[cfg(feature = "__http_tracing")]
{
- let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
+ let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len()));
if let Ok(total) = total {
println!("HTTP id={} total={}: {}", $index, total, format!($args));
} else {
@@ -77,6 +84,8 @@ macro_rules! http_trace {
};
}
+pub(crate) use http_trace;
+
/// Hold a lock on the slab table and a reference to one entry in the table.
pub struct SlabEntry(
NonNull<HttpSlabRecord>,
@@ -121,6 +130,7 @@ fn slab_insert_raw(
request_parts: Parts,
request_body: Option<Incoming>,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
) -> SlabId {
let index = SLAB.with(|slab| {
let mut slab = slab.borrow_mut();
@@ -135,6 +145,7 @@ fn slab_insert_raw(
trailers,
been_dropped: false,
promise: CompletionHandle::default(),
+ refcount: Some(refcount),
#[cfg(feature = "__zombie_http_tracking")]
alive: true,
})
@@ -146,9 +157,10 @@ fn slab_insert_raw(
pub fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
) -> SlabId {
let (request_parts, request_body) = request.into_parts();
- slab_insert_raw(request_parts, Some(request_body), request_info)
+ slab_insert_raw(request_parts, Some(request_body), request_info, refcount)
}
pub fn slab_drop(index: SlabId) {
@@ -159,10 +171,21 @@ pub fn slab_drop(index: SlabId) {
!record.been_dropped,
"HTTP state error: Entry has already been dropped"
);
+
+ // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND
+ // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished
+ // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which
+ // might include actual resources, and the refcount, which is keeping the server alive.
record.been_dropped = true;
if record.promise.is_completed() {
drop(entry);
slab_expunge(index);
+ } else {
+ // Take the request body, as the future has been dropped and this will allow some resources to close
+ record.request_body.take();
+ // Take the refcount keeping the server alive. The future is no longer alive, which means this request
+ // is toast.
+ record.refcount.take();
}
}
@@ -318,6 +341,7 @@ mod tests {
local_port: None,
stream_type: NetworkStreamType::Tcp,
},
+ RefCount::default(),
);
let entry = slab_get(id);
entry.complete();