diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-09-11 18:06:38 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-12 00:06:38 +0000 |
commit | 950e0e9cd65bd634d59fe60d5a0cb8651958c7fb (patch) | |
tree | 400dd0eaacf7ce1018a07e2e232e6d62f54112ad /cli | |
parent | bfd230fd78ef7867eec75e6b21715f51e72e7a23 (diff) |
fix(ext/http): create a graceful shutdown API (#20387)
This PR implements a graceful shutdown API for Deno.serve, allowing all
current connections to drain from the server before shutting down, while
preventing new connections from being started or new transactions on
existing connections from being created.
We split the cancellation handle into two parts: a listener handle, and
a connection handle. A graceful shutdown cancels the listener only,
while allowing the connections to drain. The connection handle aborts
all futures. If the listener handle is cancelled, we put the connections
into graceful shutdown mode, which disables keep-alive on http/1.1 and
uses http/2 mechanisms for http/2 connections.
In addition, we now guarantee that all connections are complete or
cancelled, and all resources are cleaned up when the server `finished`
promise resolves -- we use a Rust-side server refcount for this.
Performance impact: does not appear to affect basic serving performance
by more than 1% (~126k -> ~125k)
---------
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
Diffstat (limited to 'cli')
-rw-r--r-- | cli/Cargo.toml | 1 | ||||
-rw-r--r-- | cli/tests/unit/serve_test.ts | 244 | ||||
-rw-r--r-- | cli/tsc/dts/lib.deno.unstable.d.ts | 11 |
3 files changed, 249 insertions, 7 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml index c08e66180..42f868ef9 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -116,7 +116,6 @@ thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true tower-lsp.workspace = true -tracing.workspace = true twox-hash = "=1.6.3" typed-arena = "=2.0.1" uuid = { workspace = true, features = ["serde"] } diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index d1ac82696..3f58903a8 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -41,27 +41,257 @@ function onListen<T>( }; } -Deno.test(async function httpServerShutsDownPortBeforeResolving() { +async function makeServer( + handler: (req: Request) => Response | Promise<Response>, +): Promise< + { finished: Promise<void>; abort: () => void; shutdown: () => Promise<void> } +> { const ac = new AbortController(); const listeningPromise = deferred(); const server = Deno.serve({ - handler: (_req) => new Response("ok"), + handler, port: servePort, signal: ac.signal, onListen: onListen(listeningPromise), }); await listeningPromise; - assertThrows(() => Deno.listen({ port: servePort })); + return { + finished: server.finished, + abort() { + ac.abort(); + }, + async shutdown() { + await server.shutdown(); + }, + }; +} - ac.abort(); - await server.finished; +Deno.test(async function httpServerShutsDownPortBeforeResolving() { + const { finished, abort } = await makeServer((_req) => new Response("ok")); + assertThrows(() => Deno.listen({ port: servePort })); + abort(); + await finished; const listener = Deno.listen({ port: servePort }); listener!.close(); }); +// When shutting down abruptly, we require that all in-progress connections are aborted, +// no new connections are allowed, and no new transactions are allowed on existing connections. +Deno.test( + { permissions: { net: true } }, + async function httpServerShutdownAbruptGuaranteeHttp11() { + const promiseQueue: { input: Deferred<string>; out: Deferred<void> }[] = []; + const { finished, abort } = await makeServer((_req) => { + const { input, out } = promiseQueue.shift()!; + return new Response( + new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([46])); + out.resolve(undefined); + controller.enqueue(encoder.encode(await input)); + controller.close(); + }, + }), + ); + }); + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const conn = await Deno.connect({ port: servePort }); + const w = conn.writable.getWriter(); + const r = conn.readable.getReader(); + + const deferred1 = { input: deferred<string>(), out: deferred<void>() }; + promiseQueue.push(deferred1); + const deferred2 = { input: deferred<string>(), out: deferred<void>() }; + promiseQueue.push(deferred2); + const deferred3 = { input: deferred<string>(), out: deferred<void>() }; + promiseQueue.push(deferred3); + deferred1.input.resolve("#"); + deferred2.input.resolve("$"); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + + // Fully read two responses + let text = ""; + while (!text.includes("$\r\n")) { + text += decoder.decode((await r.read()).value); + } + + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await deferred3.out; + + // This is half served, so wait for the chunk that has the first '.' + text = ""; + while (!text.includes("1\r\n.\r\n")) { + text += decoder.decode((await r.read()).value); + } + + abort(); + + // This doesn't actually write anything, but we release it after aborting + deferred3.input.resolve("!"); + + // Guarantee: can't connect to an aborted server (though this may not happen immediately) + let failed = false; + for (let i = 0; i < 10; i++) { + try { + const conn = await Deno.connect({ port: servePort }); + conn.close(); + // Give the runtime a few ticks to settle (required for Windows) + await new Promise((r) => setTimeout(r, 2 ** i)); + continue; + } catch (_) { + failed = true; + break; + } + } + assert(failed, "The Deno.serve listener was not disabled promptly"); + + // Guarantee: the pipeline is closed abruptly + assert((await r.read()).done); + + try { + conn.close(); + } catch (_) { + // Ignore + } + await finished; + }, +); + +// When shutting down abruptly, we require that all in-progress connections are aborted, +// no new connections are allowed, and no new transactions are allowed on existing connections. +Deno.test( + { permissions: { net: true } }, + async function httpServerShutdownGracefulGuaranteeHttp11() { + const promiseQueue: { input: Deferred<string>; out: Deferred<void> }[] = []; + const { finished, shutdown } = await makeServer((_req) => { + const { input, out } = promiseQueue.shift()!; + return new Response( + new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([46])); + out.resolve(undefined); + controller.enqueue(encoder.encode(await input)); + controller.close(); + }, + }), + ); + }); + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const conn = await Deno.connect({ port: servePort }); + const w = conn.writable.getWriter(); + const r = conn.readable.getReader(); + + const deferred1 = { input: deferred<string>(), out: deferred<void>() }; + promiseQueue.push(deferred1); + const deferred2 = { input: deferred<string>(), out: deferred<void>() }; + promiseQueue.push(deferred2); + const deferred3 = { input: deferred<string>(), out: deferred<void>() }; + promiseQueue.push(deferred3); + deferred1.input.resolve("#"); + deferred2.input.resolve("$"); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + + // Fully read two responses + let text = ""; + while (!text.includes("$\r\n")) { + text += decoder.decode((await r.read()).value); + } + + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await deferred3.out; + + // This is half served, so wait for the chunk that has the first '.' + text = ""; + while (!text.includes("1\r\n.\r\n")) { + text += decoder.decode((await r.read()).value); + } + + const shutdownPromise = shutdown(); + + // Release the final response _after_ we shut down + deferred3.input.resolve("!"); + + // Guarantee: can't connect to an aborted server (though this may not happen immediately) + let failed = false; + for (let i = 0; i < 10; i++) { + try { + const conn = await Deno.connect({ port: servePort }); + conn.close(); + // Give the runtime a few ticks to settle (required for Windows) + await new Promise((r) => setTimeout(r, 2 ** i)); + continue; + } catch (_) { + failed = true; + break; + } + } + assert(failed, "The Deno.serve listener was not disabled promptly"); + + // Guarantee: existing connections fully drain + while (!text.includes("!\r\n")) { + text += decoder.decode((await r.read()).value); + } + + await shutdownPromise; + + try { + conn.close(); + } catch (_) { + // Ignore + } + await finished; + }, +); + +// Ensure that resources don't leak during a graceful shutdown +Deno.test( + { permissions: { net: true, write: true, read: true } }, + async function httpServerShutdownGracefulResources() { + const waitForRequest = deferred(); + const { finished, shutdown } = await makeServer(async (_req) => { + waitForRequest.resolve(null); + await new Promise((r) => setTimeout(r, 10)); + return new Response((await makeTempFile(1024 * 1024)).readable); + }); + + const f = fetch(`http://localhost:${servePort}`); + await waitForRequest; + assertEquals((await (await f).text()).length, 1048576); + await shutdown(); + await finished; + }, +); + +// Ensure that resources don't leak during a graceful shutdown +Deno.test( + { permissions: { net: true, write: true, read: true } }, + async function httpServerShutdownGracefulResources2() { + const waitForAbort = deferred(); + const waitForRequest = deferred(); + const { finished, shutdown } = await makeServer(async (_req) => { + waitForRequest.resolve(null); + await waitForAbort; + await new Promise((r) => setTimeout(r, 10)); + return new Response((await makeTempFile(1024 * 1024)).readable); + }); + + const f = fetch(`http://localhost:${servePort}`); + await waitForRequest; + const s = shutdown(); + waitForAbort.resolve(null); + assertEquals((await (await f).text()).length, 1048576); + await s; + await finished; + }, +); + Deno.test( { permissions: { read: true, run: true } }, async function httpServerUnref() { @@ -2459,7 +2689,9 @@ for (const url of ["text", "file", "stream"]) { // Give it a few milliseconds for the serve machinery to work await new Promise((r) => setTimeout(r, 10)); - ac.abort(); + // Since the handler has a chance of creating resources or running async ops, we need to use a + // graceful shutdown here to ensure they have fully drained. + await server.shutdown(); await server.finished; }, }); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 6a6d6c836..c8b857dc6 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1936,6 +1936,17 @@ declare namespace Deno { /** The value of this unsigned 64-bit integer, represented as a bigint. */ readonly value: bigint; } + + /** An instance of the server created using `Deno.serve()` API. + * + * @category HTTP Server + */ + export interface Server { + /** Gracefully close the server. No more new connections will be accepted, + * while pending requests will be allowed to finish. + */ + shutdown(): Promise<void>; + } } /** **UNSTABLE**: New API, yet to be vetted. |