From eed2598e6cf1db643b4edd07b5eff94c59eb9408 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 24 Apr 2024 14:03:37 -0400 Subject: feat(ext/http): Implement request.signal for Deno.serve (#23425) When the response has been successfully send, we abort the `Request.signal` property to indicate that all resources associated with this transaction may be torn down. --- cli/tsc/dts/lib.deno.unstable.d.ts | 11 + ext/http/00_serve.js | 741 ---------------------------------- ext/http/00_serve.ts | 800 +++++++++++++++++++++++++++++++++++++ ext/http/http_next.rs | 5 +- ext/http/lib.rs | 2 +- ext/http/response_body.rs | 15 +- ext/http/service.rs | 12 +- ext/node/polyfills/http.ts | 2 +- ext/node/polyfills/http2.ts | 2 +- runtime/js/90_deno_ns.js | 2 +- tests/unit/serve_test.ts | 17 +- tools/core_import_map.json | 2 +- 12 files changed, 848 insertions(+), 763 deletions(-) delete mode 100644 ext/http/00_serve.js create mode 100644 ext/http/00_serve.ts diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 76656108b..0c1ab8af0 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -10,6 +10,17 @@ declare namespace Deno { export {}; // stop default export type behavior + /** Information for a HTTP request. + * + * @category HTTP Server + */ + export interface ServeHandlerInfo { + /** The remote address of the connection. */ + remoteAddr: Deno.NetAddr; + /** The completion promise */ + completed: Promise; + } + /** **UNSTABLE**: New API, yet to be vetted. * * Retrieve the process umask. If `mask` is provided, sets the process umask. diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js deleted file mode 100644 index 52b833f10..000000000 --- a/ext/http/00_serve.js +++ /dev/null @@ -1,741 +0,0 @@ -// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. - -import { core, internals, primordials } from "ext:core/mod.js"; -const { - BadResourcePrototype, - InterruptedPrototype, - internalRidSymbol, -} = core; -import { - op_http_cancel, - op_http_close, - op_http_close_after_finish, - op_http_get_request_headers, - op_http_get_request_method_and_url, - op_http_read_request_body, - op_http_serve, - op_http_serve_on, - op_http_set_promise_complete, - op_http_set_response_body_bytes, - op_http_set_response_body_resource, - op_http_set_response_body_text, - op_http_set_response_header, - op_http_set_response_headers, - op_http_set_response_trailers, - op_http_try_wait, - op_http_upgrade_raw, - op_http_upgrade_websocket_next, - op_http_wait, -} from "ext:core/ops"; -const { - ArrayPrototypePush, - ObjectHasOwn, - ObjectPrototypeIsPrototypeOf, - PromisePrototypeCatch, - PromisePrototypeThen, - Symbol, - TypeError, - TypedArrayPrototypeGetSymbolToStringTag, - Uint8Array, -} = primordials; - -import { InnerBody } from "ext:deno_fetch/22_body.js"; -import { Event } from "ext:deno_web/02_event.js"; -import { - fromInnerResponse, - newInnerResponse, - ResponsePrototype, - toInnerResponse, -} from "ext:deno_fetch/23_response.js"; -import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js"; -import { AbortController } from "ext:deno_web/03_abort_signal.js"; -import { - _eventLoop, - _idleTimeoutDuration, - _idleTimeoutTimeout, - _protocol, - _readyState, - _rid, - _role, - _server, - _serverHandleIdleTimeout, - SERVER, - WebSocket, -} from "ext:deno_websocket/01_websocket.js"; -import { - Deferred, - getReadableStreamResourceBacking, - readableStreamForRid, - ReadableStreamPrototype, - resourceForReadableStream, -} from "ext:deno_web/06_streams.js"; -import { listen, listenOptionApiName, TcpConn } from "ext:deno_net/01_net.js"; -import { hasTlsKeyPairOptions, listenTls } from "ext:deno_net/02_tls.js"; -import { SymbolAsyncDispose } from "ext:deno_web/00_infra.js"; - -const _upgraded = Symbol("_upgraded"); - -function internalServerError() { - // "Internal Server Error" - return new Response( - new Uint8Array([ - 73, - 110, - 116, - 101, - 114, - 110, - 97, - 108, - 32, - 83, - 101, - 114, - 118, - 101, - 114, - 32, - 69, - 114, - 114, - 111, - 114, - ]), - { status: 500 }, - ); -} - -// Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded. -const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse( - newInnerResponse(101), - "immutable", -); - -function upgradeHttpRaw(req, conn) { - const inner = toInnerRequest(req); - if (inner._wantsUpgrade) { - return inner._wantsUpgrade("upgradeHttpRaw", conn); - } - throw new TypeError("upgradeHttpRaw may only be used with Deno.serve"); -} - -function addTrailers(resp, headerList) { - const inner = toInnerResponse(resp); - op_http_set_response_trailers(inner.external, headerList); -} - -class InnerRequest { - #external; - #context; - #methodAndUri; - #streamRid; - #body; - #upgraded; - #urlValue; - - constructor(external, context) { - this.#external = external; - this.#context = context; - this.#upgraded = false; - } - - close() { - this.#external = null; - } - - get [_upgraded]() { - return this.#upgraded; - } - - _wantsUpgrade(upgradeType, ...originalArgs) { - if (this.#upgraded) { - throw new Deno.errors.Http("already upgraded"); - } - if (this.#external === null) { - throw new Deno.errors.Http("already closed"); - } - - // upgradeHttpRaw is sync - if (upgradeType == "upgradeHttpRaw") { - const external = this.#external; - const underlyingConn = originalArgs[0]; - - this.url(); - this.headerList; - this.close(); - - this.#upgraded = () => {}; - - const upgradeRid = op_http_upgrade_raw(external); - - const conn = new TcpConn( - upgradeRid, - underlyingConn?.remoteAddr, - underlyingConn?.localAddr, - ); - - return { response: UPGRADE_RESPONSE_SENTINEL, conn }; - } - - // upgradeWebSocket is sync - if (upgradeType == "upgradeWebSocket") { - const response = originalArgs[0]; - const ws = originalArgs[1]; - - const external = this.#external; - - this.url(); - this.headerList; - this.close(); - - const goAhead = new Deferred(); - this.#upgraded = () => { - goAhead.resolve(); - }; - const wsPromise = op_http_upgrade_websocket_next( - external, - response.headerList, - ); - - // Start the upgrade in the background. - (async () => { - try { - // Returns the upgraded websocket connection - const wsRid = await wsPromise; - - // We have to wait for the go-ahead signal - await goAhead; - - ws[_rid] = wsRid; - ws[_readyState] = WebSocket.OPEN; - ws[_role] = SERVER; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - if (ws[_idleTimeoutDuration]) { - ws.addEventListener( - "close", - () => clearTimeout(ws[_idleTimeoutTimeout]), - ); - } - ws[_serverHandleIdleTimeout](); - } catch (error) { - const event = new ErrorEvent("error", { error }); - ws.dispatchEvent(event); - } - })(); - return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws }; - } - } - - url() { - if (this.#urlValue !== undefined) { - return this.#urlValue; - } - - if (this.#methodAndUri === undefined) { - if (this.#external === null) { - throw new TypeError("request closed"); - } - // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider - // splitting this up into multiple ops. - this.#methodAndUri = op_http_get_request_method_and_url(this.#external); - } - - const path = this.#methodAndUri[2]; - - // * is valid for OPTIONS - if (path === "*") { - return this.#urlValue = "*"; - } - - // If the path is empty, return the authority (valid for CONNECT) - if (path == "") { - return this.#urlValue = this.#methodAndUri[1]; - } - - // CONNECT requires an authority - if (this.#methodAndUri[0] == "CONNECT") { - return this.#urlValue = this.#methodAndUri[1]; - } - - const hostname = this.#methodAndUri[1]; - if (hostname) { - // Construct a URL from the scheme, the hostname, and the path - return this.#urlValue = this.#context.scheme + hostname + path; - } - - // Construct a URL from the scheme, the fallback hostname, and the path - return this.#urlValue = this.#context.scheme + this.#context.fallbackHost + - path; - } - - get remoteAddr() { - const transport = this.#context.listener?.addr.transport; - if (transport === "unix" || transport === "unixpacket") { - return { - transport, - path: this.#context.listener.addr.path, - }; - } - if (this.#methodAndUri === undefined) { - if (this.#external === null) { - throw new TypeError("request closed"); - } - this.#methodAndUri = op_http_get_request_method_and_url(this.#external); - } - return { - transport: "tcp", - hostname: this.#methodAndUri[3], - port: this.#methodAndUri[4], - }; - } - - get method() { - if (this.#methodAndUri === undefined) { - if (this.#external === null) { - throw new TypeError("request closed"); - } - this.#methodAndUri = op_http_get_request_method_and_url(this.#external); - } - return this.#methodAndUri[0]; - } - - get body() { - if (this.#external === null) { - throw new TypeError("request closed"); - } - if (this.#body !== undefined) { - return this.#body; - } - // If the method is GET or HEAD, we do not want to include a body here, even if the Rust - // side of the code is willing to provide it to us. - if (this.method == "GET" || this.method == "HEAD") { - this.#body = null; - return null; - } - this.#streamRid = op_http_read_request_body(this.#external); - this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false)); - return this.#body; - } - - get headerList() { - if (this.#external === null) { - throw new TypeError("request closed"); - } - const headers = []; - const reqHeaders = op_http_get_request_headers(this.#external); - for (let i = 0; i < reqHeaders.length; i += 2) { - ArrayPrototypePush(headers, [reqHeaders[i], reqHeaders[i + 1]]); - } - return headers; - } - - get external() { - return this.#external; - } -} - -class CallbackContext { - abortController; - scheme; - fallbackHost; - serverRid; - closed; - /** @type {Promise | undefined} */ - closing; - listener; - - constructor(signal, args, listener) { - // The abort signal triggers a non-graceful shutdown - signal?.addEventListener( - "abort", - () => { - op_http_cancel(this.serverRid, false); - }, - { once: true }, - ); - this.abortController = new AbortController(); - this.serverRid = args[0]; - this.scheme = args[1]; - this.fallbackHost = args[2]; - this.closed = false; - this.listener = listener; - } - - close() { - try { - this.closed = true; - core.tryClose(this.serverRid); - } catch { - // Pass - } - } -} - -class ServeHandlerInfo { - #inner = null; - constructor(inner) { - this.#inner = inner; - } - get remoteAddr() { - return this.#inner.remoteAddr; - } -} - -function fastSyncResponseOrStream(req, respBody, status, innerRequest) { - if (respBody === null || respBody === undefined) { - // Don't set the body - innerRequest?.close(); - op_http_set_promise_complete(req, status); - return; - } - - const stream = respBody.streamOrStatic; - const body = stream.body; - - if (TypedArrayPrototypeGetSymbolToStringTag(body) === "Uint8Array") { - innerRequest?.close(); - op_http_set_response_body_bytes(req, body, status); - return; - } - - if (typeof body === "string") { - innerRequest?.close(); - op_http_set_response_body_text(req, body, status); - return; - } - - // At this point in the response it needs to be a stream - if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { - innerRequest?.close(); - throw TypeError("invalid response"); - } - const resourceBacking = getReadableStreamResourceBacking(stream); - let rid, autoClose; - if (resourceBacking) { - rid = resourceBacking.rid; - autoClose = resourceBacking.autoClose; - } else { - rid = resourceForReadableStream(stream); - autoClose = true; - } - PromisePrototypeThen( - op_http_set_response_body_resource( - req, - rid, - autoClose, - status, - ), - () => { - innerRequest?.close(); - op_http_close_after_finish(req); - }, - ); -} - -/** - * Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided - * callback, then extracts the response that was returned from that callback. The response is then pulled - * apart and handled on the Rust side. - * - * This function returns a promise that will only reject in the case of abnormal exit. - */ -function mapToCallback(context, callback, onError) { - const signal = context.abortController.signal; - - return async function (req) { - // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback - // 500 error. - let innerRequest; - let response; - try { - innerRequest = new InnerRequest(req, context); - response = await callback( - fromInnerRequest(innerRequest, signal, "immutable"), - new ServeHandlerInfo(innerRequest), - ); - - // Throwing Error if the handler return value is not a Response class - if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { - throw TypeError( - "Return value from serve handler must be a response or a promise resolving to a response", - ); - } - } catch (error) { - try { - response = await onError(error); - if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { - throw TypeError( - "Return value from onError handler must be a response or a promise resolving to a response", - ); - } - } catch (error) { - console.error("Exception in onError while handling exception", error); - response = internalServerError(); - } - } - const inner = toInnerResponse(response); - if (innerRequest?.[_upgraded]) { - // We're done here as the connection has been upgraded during the callback and no longer requires servicing. - if (response !== UPGRADE_RESPONSE_SENTINEL) { - console.error("Upgrade response was not returned from callback"); - context.close(); - } - innerRequest?.[_upgraded](); - return; - } - - // Did everything shut down while we were waiting? - if (context.closed) { - // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate - innerRequest?.close(); - op_http_set_promise_complete(req, 503); - return; - } - - const status = inner.status; - const headers = inner.headerList; - if (headers && headers.length > 0) { - if (headers.length == 1) { - op_http_set_response_header(req, headers[0][0], headers[0][1]); - } else { - op_http_set_response_headers(req, headers); - } - } - - fastSyncResponseOrStream(req, inner.body, status, innerRequest); - }; -} - -function serve(arg1, arg2) { - let options = undefined; - let handler = undefined; - if (typeof arg1 === "function") { - handler = arg1; - } else if (typeof arg2 === "function") { - handler = arg2; - options = arg1; - } else { - options = arg1; - } - if (handler === undefined) { - if (options === undefined) { - throw new TypeError( - "No handler was provided, so an options bag is mandatory.", - ); - } - handler = options.handler; - } - if (typeof handler !== "function") { - throw new TypeError("A handler function must be provided."); - } - if (options === undefined) { - options = {}; - } - - const wantsHttps = hasTlsKeyPairOptions(options); - const wantsUnix = ObjectHasOwn(options, "path"); - const signal = options.signal; - const onError = options.onError ?? function (error) { - console.error(error); - return internalServerError(); - }; - - if (wantsUnix) { - const listener = listen({ - transport: "unix", - path: options.path, - [listenOptionApiName]: "Deno.serve", - }); - const path = listener.addr.path; - return serveHttpOnListener(listener, signal, handler, onError, () => { - if (options.onListen) { - options.onListen(listener.addr); - } else { - console.log(`Listening on ${path}`); - } - }); - } - - const listenOpts = { - hostname: options.hostname ?? "0.0.0.0", - port: options.port ?? 8000, - reusePort: options.reusePort ?? false, - }; - - if (options.certFile || options.keyFile) { - throw new TypeError( - "Unsupported 'certFile' / 'keyFile' options provided: use 'cert' / 'key' instead.", - ); - } - if (options.alpnProtocols) { - throw new TypeError( - "Unsupported 'alpnProtocols' option provided. 'h2' and 'http/1.1' are automatically supported.", - ); - } - - let listener; - if (wantsHttps) { - if (!options.cert || !options.key) { - throw new TypeError( - "Both cert and key must be provided to enable HTTPS.", - ); - } - listenOpts.cert = options.cert; - listenOpts.key = options.key; - listenOpts.alpnProtocols = ["h2", "http/1.1"]; - listener = listenTls(listenOpts); - listenOpts.port = listener.addr.port; - } else { - listener = listen(listenOpts); - listenOpts.port = listener.addr.port; - } - - const addr = listener.addr; - // If the hostname is "0.0.0.0", we display "localhost" in console - // because browsers in Windows don't resolve "0.0.0.0". - // See the discussion in https://github.com/denoland/deno_std/issues/1165 - const hostname = addr.hostname == "0.0.0.0" ? "localhost" : addr.hostname; - addr.hostname = hostname; - - const onListen = (scheme) => { - if (options.onListen) { - options.onListen(addr); - } else { - console.log(`Listening on ${scheme}${addr.hostname}:${addr.port}/`); - } - }; - - return serveHttpOnListener(listener, signal, handler, onError, onListen); -} - -/** - * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary listener. - */ -function serveHttpOnListener(listener, signal, handler, onError, onListen) { - const context = new CallbackContext( - signal, - op_http_serve(listener[internalRidSymbol]), - listener, - ); - const callback = mapToCallback(context, handler, onError); - - onListen(context.scheme); - - return serveHttpOn(context, listener.addr, callback); -} - -/** - * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary connection. - */ -function serveHttpOnConnection(connection, signal, handler, onError, onListen) { - const context = new CallbackContext( - signal, - op_http_serve_on(connection[internalRidSymbol]), - null, - ); - const callback = mapToCallback(context, handler, onError); - - onListen(context.scheme); - - return serveHttpOn(context, connection.localAddr, callback); -} - -function serveHttpOn(context, addr, callback) { - let ref = true; - let currentPromise = null; - - const promiseErrorHandler = (error) => { - // Abnormal exit - console.error( - "Terminating Deno.serve loop due to unexpected error", - error, - ); - context.close(); - }; - - // Run the server - const finished = (async () => { - const rid = context.serverRid; - while (true) { - let req; - try { - // Attempt to pull as many requests out of the queue as possible before awaiting. This API is - // a synchronous, non-blocking API that returns u32::MAX if anything goes wrong. - while ((req = op_http_try_wait(rid)) !== null) { - PromisePrototypeCatch(callback(req), promiseErrorHandler); - } - currentPromise = op_http_wait(rid); - if (!ref) { - core.unrefOpPromise(currentPromise); - } - req = await currentPromise; - currentPromise = null; - } catch (error) { - if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { - break; - } - if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) { - break; - } - throw new Deno.errors.Http(error); - } - if (req === null) { - break; - } - PromisePrototypeCatch(callback(req), promiseErrorHandler); - } - - if (!context.closing && !context.closed) { - context.closing = op_http_close(rid, false); - context.close(); - } - - await context.closing; - context.close(); - context.closed = true; - })(); - - return { - addr, - finished, - async shutdown() { - if (!context.closing && !context.closed) { - // Shut this HTTP server down gracefully - context.closing = op_http_close(context.serverRid, true); - } - await context.closing; - context.closed = true; - }, - ref() { - ref = true; - if (currentPromise) { - core.refOpPromise(currentPromise); - } - }, - unref() { - ref = false; - if (currentPromise) { - core.unrefOpPromise(currentPromise); - } - }, - [SymbolAsyncDispose]() { - return this.shutdown(); - }, - }; -} - -internals.addTrailers = addTrailers; -internals.upgradeHttpRaw = upgradeHttpRaw; -internals.serveHttpOnListener = serveHttpOnListener; -internals.serveHttpOnConnection = serveHttpOnConnection; - -export { - addTrailers, - serve, - serveHttpOnConnection, - serveHttpOnListener, - upgradeHttpRaw, -}; diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts new file mode 100644 index 000000000..1063f9691 --- /dev/null +++ b/ext/http/00_serve.ts @@ -0,0 +1,800 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { core, internals, primordials } from "ext:core/mod.js"; +const { + BadResourcePrototype, + InterruptedPrototype, + Interrupted, + internalRidSymbol, +} = core; +import { + op_http_cancel, + op_http_close, + op_http_close_after_finish, + op_http_get_request_headers, + op_http_get_request_method_and_url, + op_http_read_request_body, + op_http_serve, + op_http_serve_on, + op_http_set_promise_complete, + op_http_set_response_body_bytes, + op_http_set_response_body_resource, + op_http_set_response_body_text, + op_http_set_response_header, + op_http_set_response_headers, + op_http_set_response_trailers, + op_http_try_wait, + op_http_upgrade_raw, + op_http_upgrade_websocket_next, + op_http_wait, +} from "ext:core/ops"; +const { + ArrayPrototypePush, + ObjectHasOwn, + ObjectPrototypeIsPrototypeOf, + PromisePrototypeCatch, + PromisePrototypeThen, + Symbol, + TypeError, + TypedArrayPrototypeGetSymbolToStringTag, + Uint8Array, + Promise, +} = primordials; + +import { InnerBody } from "ext:deno_fetch/22_body.js"; +import { Event } from "ext:deno_web/02_event.js"; +import { + fromInnerResponse, + newInnerResponse, + ResponsePrototype, + toInnerResponse, +} from "ext:deno_fetch/23_response.js"; +import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js"; +import { AbortController } from "ext:deno_web/03_abort_signal.js"; +import { + _eventLoop, + _idleTimeoutDuration, + _idleTimeoutTimeout, + _protocol, + _readyState, + _rid, + _role, + _server, + _serverHandleIdleTimeout, + SERVER, + WebSocket, +} from "ext:deno_websocket/01_websocket.js"; +import { + Deferred, + getReadableStreamResourceBacking, + readableStreamForRid, + ReadableStreamPrototype, + resourceForReadableStream, +} from "ext:deno_web/06_streams.js"; +import { listen, listenOptionApiName, TcpConn } from "ext:deno_net/01_net.js"; +import { hasTlsKeyPairOptions, listenTls } from "ext:deno_net/02_tls.js"; +import { SymbolAsyncDispose } from "ext:deno_web/00_infra.js"; + +const _upgraded = Symbol("_upgraded"); + +function internalServerError() { + // "Internal Server Error" + return new Response( + new Uint8Array([ + 73, + 110, + 116, + 101, + 114, + 110, + 97, + 108, + 32, + 83, + 101, + 114, + 118, + 101, + 114, + 32, + 69, + 114, + 114, + 111, + 114, + ]), + { status: 500 }, + ); +} + +// Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded. +const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse( + newInnerResponse(101), + "immutable", +); + +function upgradeHttpRaw(req, conn) { + const inner = toInnerRequest(req); + if (inner._wantsUpgrade) { + return inner._wantsUpgrade("upgradeHttpRaw", conn); + } + throw new TypeError("upgradeHttpRaw may only be used with Deno.serve"); +} + +function addTrailers(resp, headerList) { + const inner = toInnerResponse(resp); + op_http_set_response_trailers(inner.external, headerList); +} + +class InnerRequest { + #external; + #context; + #methodAndUri; + #streamRid; + #body; + #upgraded; + #urlValue; + #completed; + #abortController; + + constructor(external, context, abortController) { + this.#external = external; + this.#context = context; + this.#upgraded = false; + this.#completed = undefined; + this.#abortController = abortController; + } + + close(success = true) { + // The completion signal fires only if someone cares + if (this.#completed) { + if (success) { + this.#completed.resolve(undefined); + } else { + this.#completed.reject( + new Interrupted("HTTP response was not sent successfully"), + ); + } + } + // Unconditionally abort the request signal. Note that we don't use + // an error here. + this.#abortController.abort(); + this.#external = null; + } + + get [_upgraded]() { + return this.#upgraded; + } + + _wantsUpgrade(upgradeType, ...originalArgs) { + if (this.#upgraded) { + throw new Deno.errors.Http("already upgraded"); + } + if (this.#external === null) { + throw new Deno.errors.Http("already closed"); + } + + // upgradeHttpRaw is sync + if (upgradeType == "upgradeHttpRaw") { + const external = this.#external; + const underlyingConn = originalArgs[0]; + + this.url(); + this.headerList; + this.close(); + + this.#upgraded = () => {}; + + const upgradeRid = op_http_upgrade_raw(external); + + const conn = new TcpConn( + upgradeRid, + underlyingConn?.remoteAddr, + underlyingConn?.localAddr, + ); + + return { response: UPGRADE_RESPONSE_SENTINEL, conn }; + } + + // upgradeWebSocket is sync + if (upgradeType == "upgradeWebSocket") { + const response = originalArgs[0]; + const ws = originalArgs[1]; + + const external = this.#external; + + this.url(); + this.headerList; + this.close(); + + const goAhead = new Deferred(); + this.#upgraded = () => { + goAhead.resolve(); + }; + const wsPromise = op_http_upgrade_websocket_next( + external, + response.headerList, + ); + + // Start the upgrade in the background. + (async () => { + try { + // Returns the upgraded websocket connection + const wsRid = await wsPromise; + + // We have to wait for the go-ahead signal + await goAhead; + + ws[_rid] = wsRid; + ws[_readyState] = WebSocket.OPEN; + ws[_role] = SERVER; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); + } catch (error) { + const event = new ErrorEvent("error", { error }); + ws.dispatchEvent(event); + } + })(); + return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws }; + } + } + + url() { + if (this.#urlValue !== undefined) { + return this.#urlValue; + } + + if (this.#methodAndUri === undefined) { + if (this.#external === null) { + throw new TypeError("request closed"); + } + // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider + // splitting this up into multiple ops. + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); + } + + const path = this.#methodAndUri[2]; + + // * is valid for OPTIONS + if (path === "*") { + return this.#urlValue = "*"; + } + + // If the path is empty, return the authority (valid for CONNECT) + if (path == "") { + return this.#urlValue = this.#methodAndUri[1]; + } + + // CONNECT requires an authority + if (this.#methodAndUri[0] == "CONNECT") { + return this.#urlValue = this.#methodAndUri[1]; + } + + const hostname = this.#methodAndUri[1]; + if (hostname) { + // Construct a URL from the scheme, the hostname, and the path + return this.#urlValue = this.#context.scheme + hostname + path; + } + + // Construct a URL from the scheme, the fallback hostname, and the path + return this.#urlValue = this.#context.scheme + this.#context.fallbackHost + + path; + } + + get completed() { + if (!this.#completed) { + // NOTE: this is faster than Promise.withResolvers() + let resolve, reject; + const promise = new Promise((r1, r2) => { + resolve = r1; + reject = r2; + }); + this.#completed = { promise, resolve, reject }; + } + return this.#completed.promise; + } + + get remoteAddr() { + const transport = this.#context.listener?.addr.transport; + if (transport === "unix" || transport === "unixpacket") { + return { + transport, + path: this.#context.listener.addr.path, + }; + } + if (this.#methodAndUri === undefined) { + if (this.#external === null) { + throw new TypeError("request closed"); + } + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); + } + return { + transport: "tcp", + hostname: this.#methodAndUri[3], + port: this.#methodAndUri[4], + }; + } + + get method() { + if (this.#methodAndUri === undefined) { + if (this.#external === null) { + throw new TypeError("request closed"); + } + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); + } + return this.#methodAndUri[0]; + } + + get body() { + if (this.#external === null) { + throw new TypeError("request closed"); + } + if (this.#body !== undefined) { + return this.#body; + } + // If the method is GET or HEAD, we do not want to include a body here, even if the Rust + // side of the code is willing to provide it to us. + if (this.method == "GET" || this.method == "HEAD") { + this.#body = null; + return null; + } + this.#streamRid = op_http_read_request_body(this.#external); + this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false)); + return this.#body; + } + + get headerList() { + if (this.#external === null) { + throw new TypeError("request closed"); + } + const headers = []; + const reqHeaders = op_http_get_request_headers(this.#external); + for (let i = 0; i < reqHeaders.length; i += 2) { + ArrayPrototypePush(headers, [reqHeaders[i], reqHeaders[i + 1]]); + } + return headers; + } + + get external() { + return this.#external; + } +} + +class CallbackContext { + abortController; + scheme; + fallbackHost; + serverRid; + closed; + /** @type {Promise | undefined} */ + closing; + listener; + + constructor(signal, args, listener) { + // The abort signal triggers a non-graceful shutdown + signal?.addEventListener( + "abort", + () => { + op_http_cancel(this.serverRid, false); + }, + { once: true }, + ); + this.abortController = new AbortController(); + this.serverRid = args[0]; + this.scheme = args[1]; + this.fallbackHost = args[2]; + this.closed = false; + this.listener = listener; + } + + close() { + try { + this.closed = true; + core.tryClose(this.serverRid); + } catch { + // Pass + } + } +} + +class ServeHandlerInfo { + #inner: InnerRequest; + constructor(inner: InnerRequest) { + this.#inner = inner; + } + get remoteAddr() { + return this.#inner.remoteAddr; + } + get completed() { + return this.#inner.completed; + } +} + +function fastSyncResponseOrStream( + req, + respBody, + status, + innerRequest: InnerRequest, +) { + if (respBody === null || respBody === undefined) { + // Don't set the body + innerRequest?.close(); + op_http_set_promise_complete(req, status); + return; + } + + const stream = respBody.streamOrStatic; + const body = stream.body; + + if (TypedArrayPrototypeGetSymbolToStringTag(body) === "Uint8Array") { + innerRequest?.close(); + op_http_set_response_body_bytes(req, body, status); + return; + } + + if (typeof body === "string") { + innerRequest?.close(); + op_http_set_response_body_text(req, body, status); + return; + } + + // At this point in the response it needs to be a stream + if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { + innerRequest?.close(); + throw TypeError("invalid response"); + } + const resourceBacking = getReadableStreamResourceBacking(stream); + let rid, autoClose; + if (resourceBacking) { + rid = resourceBacking.rid; + autoClose = resourceBacking.autoClose; + } else { + rid = resourceForReadableStream(stream); + autoClose = true; + } + PromisePrototypeThen( + op_http_set_response_body_resource( + req, + rid, + autoClose, + status, + ), + (success) => { + innerRequest?.close(success); + op_http_close_after_finish(req); + }, + ); +} + +/** + * Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided + * callback, then extracts the response that was returned from that callback. The response is then pulled + * apart and handled on the Rust side. + * + * This function returns a promise that will only reject in the case of abnormal exit. + */ +function mapToCallback(context, callback, onError) { + return async function (req) { + const abortController = new AbortController(); + const signal = abortController.signal; + + // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback + // 500 error. + let innerRequest; + let response; + try { + innerRequest = new InnerRequest(req, context, abortController); + response = await callback( + fromInnerRequest(innerRequest, signal, "immutable"), + new ServeHandlerInfo(innerRequest), + ); + + // Throwing Error if the handler return value is not a Response class + if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { + throw TypeError( + "Return value from serve handler must be a response or a promise resolving to a response", + ); + } + } catch (error) { + try { + response = await onError(error); + if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { + throw TypeError( + "Return value from onError handler must be a response or a promise resolving to a response", + ); + } + } catch (error) { + console.error("Exception in onError while handling exception", error); + response = internalServerError(); + } + } + const inner = toInnerResponse(response); + if (innerRequest?.[_upgraded]) { + // We're done here as the connection has been upgraded during the callback and no longer requires servicing. + if (response !== UPGRADE_RESPONSE_SENTINEL) { + console.error("Upgrade response was not returned from callback"); + context.close(); + } + innerRequest?.[_upgraded](); + return; + } + + // Did everything shut down while we were waiting? + if (context.closed) { + // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate + innerRequest?.close(); + op_http_set_promise_complete(req, 503); + return; + } + + const status = inner.status; + const headers = inner.headerList; + if (headers && headers.length > 0) { + if (headers.length == 1) { + op_http_set_response_header(req, headers[0][0], headers[0][1]); + } else { + op_http_set_response_headers(req, headers); + } + } + + fastSyncResponseOrStream(req, inner.body, status, innerRequest); + }; +} + +type RawHandler = ( + request: Request, + info: ServeHandlerInfo, +) => Response | Promise; + +type RawServeOptions = { + port?: number; + hostname?: string; + signal?: AbortSignal; + reusePort?: boolean; + key?: string; + cert?: string; + onError?: (error: unknown) => Response | Promise; + onListen?: (params: { hostname: string; port: number }) => void; + handler?: RawHandler; +}; + +function serve(arg1, arg2) { + let options: RawServeOptions | undefined; + let handler: RawHandler | undefined; + + if (typeof arg1 === "function") { + handler = arg1; + } else if (typeof arg2 === "function") { + handler = arg2; + options = arg1; + } else { + options = arg1; + } + if (handler === undefined) { + if (options === undefined) { + throw new TypeError( + "No handler was provided, so an options bag is mandatory.", + ); + } + handler = options.handler; + } + if (typeof handler !== "function") { + throw new TypeError("A handler function must be provided."); + } + if (options === undefined) { + options = {}; + } + + const wantsHttps = hasTlsKeyPairOptions(options); + const wantsUnix = ObjectHasOwn(options, "path"); + const signal = options.signal; + const onError = options.onError ?? function (error) { + console.error(error); + return internalServerError(); + }; + + if (wantsUnix) { + const listener = listen({ + transport: "unix", + path: options.path, + [listenOptionApiName]: "Deno.serve", + }); + const path = listener.addr.path; + return serveHttpOnListener(listener, signal, handler, onError, () => { + if (options.onListen) { + options.onListen(listener.addr); + } else { + console.log(`Listening on ${path}`); + } + }); + } + + const listenOpts = { + hostname: options.hostname ?? "0.0.0.0", + port: options.port ?? 8000, + reusePort: options.reusePort ?? false, + }; + + if (options.certFile || options.keyFile) { + throw new TypeError( + "Unsupported 'certFile' / 'keyFile' options provided: use 'cert' / 'key' instead.", + ); + } + if (options.alpnProtocols) { + throw new TypeError( + "Unsupported 'alpnProtocols' option provided. 'h2' and 'http/1.1' are automatically supported.", + ); + } + + let listener; + if (wantsHttps) { + if (!options.cert || !options.key) { + throw new TypeError( + "Both cert and key must be provided to enable HTTPS.", + ); + } + listenOpts.cert = options.cert; + listenOpts.key = options.key; + listenOpts.alpnProtocols = ["h2", "http/1.1"]; + listener = listenTls(listenOpts); + listenOpts.port = listener.addr.port; + } else { + listener = listen(listenOpts); + listenOpts.port = listener.addr.port; + } + + const addr = listener.addr; + // If the hostname is "0.0.0.0", we display "localhost" in console + // because browsers in Windows don't resolve "0.0.0.0". + // See the discussion in https://github.com/denoland/deno_std/issues/1165 + const hostname = addr.hostname == "0.0.0.0" ? "localhost" : addr.hostname; + addr.hostname = hostname; + + const onListen = (scheme) => { + if (options.onListen) { + options.onListen(addr); + } else { + console.log(`Listening on ${scheme}${addr.hostname}:${addr.port}/`); + } + }; + + return serveHttpOnListener(listener, signal, handler, onError, onListen); +} + +/** + * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary listener. + */ +function serveHttpOnListener(listener, signal, handler, onError, onListen) { + const context = new CallbackContext( + signal, + op_http_serve(listener[internalRidSymbol]), + listener, + ); + const callback = mapToCallback(context, handler, onError); + + onListen(context.scheme); + + return serveHttpOn(context, listener.addr, callback); +} + +/** + * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary connection. + */ +function serveHttpOnConnection(connection, signal, handler, onError, onListen) { + const context = new CallbackContext( + signal, + op_http_serve_on(connection[internalRidSymbol]), + null, + ); + const callback = mapToCallback(context, handler, onError); + + onListen(context.scheme); + + return serveHttpOn(context, connection.localAddr, callback); +} + +function serveHttpOn(context, addr, callback) { + let ref = true; + let currentPromise = null; + + const promiseErrorHandler = (error) => { + // Abnormal exit + console.error( + "Terminating Deno.serve loop due to unexpected error", + error, + ); + context.close(); + }; + + // Run the server + const finished = (async () => { + const rid = context.serverRid; + while (true) { + let req; + try { + // Attempt to pull as many requests out of the queue as possible before awaiting. This API is + // a synchronous, non-blocking API that returns u32::MAX if anything goes wrong. + while ((req = op_http_try_wait(rid)) !== null) { + PromisePrototypeCatch(callback(req), promiseErrorHandler); + } + currentPromise = op_http_wait(rid); + if (!ref) { + core.unrefOpPromise(currentPromise); + } + req = await currentPromise; + currentPromise = null; + } catch (error) { + if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { + break; + } + if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) { + break; + } + throw new Deno.errors.Http(error); + } + if (req === null) { + break; + } + PromisePrototypeCatch(callback(req), promiseErrorHandler); + } + + if (!context.closing && !context.closed) { + context.closing = op_http_close(rid, false); + context.close(); + } + + await context.closing; + context.close(); + context.closed = true; + })(); + + return { + addr, + finished, + async shutdown() { + if (!context.closing && !context.closed) { + // Shut this HTTP server down gracefully + context.closing = op_http_close(context.serverRid, true); + } + await context.closing; + context.closed = true; + }, + ref() { + ref = true; + if (currentPromise) { + core.refOpPromise(currentPromise); + } + }, + unref() { + ref = false; + if (currentPromise) { + core.unrefOpPromise(currentPromise); + } + }, + [SymbolAsyncDispose]() { + return this.shutdown(); + }, + }; +} + +internals.addTrailers = addTrailers; +internals.upgradeHttpRaw = upgradeHttpRaw; +internals.serveHttpOnListener = serveHttpOnListener; +internals.serveHttpOnConnection = serveHttpOnConnection; + +export { + addTrailers, + serve, + serveHttpOnConnection, + serveHttpOnListener, + upgradeHttpRaw, +}; diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index a6527397f..9bdb79f86 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -683,7 +683,7 @@ pub async fn op_http_set_response_body_resource( #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, -) -> Result<(), AnyError> { +) -> Result { let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_body_resource") }; @@ -716,8 +716,7 @@ pub async fn op_http_set_response_body_resource( }, ); - http.response_body_finished().await; - Ok(()) + Ok(http.response_body_finished().await) } #[op2(fast)] diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 6fc7207be..934f8a002 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -131,7 +131,7 @@ deno_core::extension!( http_next::op_http_close, http_next::op_http_cancel, ], - esm = ["00_serve.js", "01_http.js", "02_websocket.ts"], + esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"], ); pub enum HttpSocketAddr { diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index dac708b96..6b033ffe0 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -16,7 +16,6 @@ use deno_core::Resource; use flate2::write::GzEncoder; use hyper::body::Frame; use hyper::body::SizeHint; -use hyper::header::HeaderMap; use pin_project::pin_project; /// Simplification for nested types we use for our streams. We provide a way to convert from @@ -30,10 +29,6 @@ pub enum ResponseStreamResult { /// not register a waker and should be called again at the lowest level of this code. Generally this /// will only be returned from compression streams that require additional buffering. NoData, - /// Stream provided trailers. - // TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc. - #[allow(unused)] - Trailers(HeaderMap), /// Stream failed. Error(AnyError), } @@ -44,7 +39,6 @@ impl From for Option, AnyError>> { ResponseStreamResult::EndOfStream => None, ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))), ResponseStreamResult::Error(err) => Some(Err(err)), - ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))), // This result should be handled by retrying ResponseStreamResult::NoData => unimplemented!(), } @@ -198,6 +192,11 @@ impl ResponseBytesInner { _ => Self::Bytes(BufView::from(vec)), } } + + /// Did we complete this response successfully? + pub fn is_complete(&self) -> bool { + matches!(self, ResponseBytesInner::Done | ResponseBytesInner::Empty) + } } pub struct ResourceBodyAdapter { @@ -387,9 +386,7 @@ impl PollFrame for GZipResponseStream { let start_out = stm.total_out(); let res = match frame { // Short-circuit these and just return - x @ (ResponseStreamResult::NoData - | ResponseStreamResult::Error(..) - | ResponseStreamResult::Trailers(..)) => { + x @ (ResponseStreamResult::NoData | ResponseStreamResult::Error(..)) => { return std::task::Poll::Ready(x) } ResponseStreamResult::EndOfStream => { diff --git a/ext/http/service.rs b/ext/http/service.rs index 932575e37..f38fec4f4 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -482,12 +482,13 @@ impl HttpRecord { HttpRecordReady(self) } - /// Resolves when response body has finished streaming. - pub fn response_body_finished(&self) -> impl Future + '_ { + /// Resolves when response body has finished streaming. Returns true if the + /// response completed. + pub fn response_body_finished(&self) -> impl Future + '_ { struct HttpRecordFinished<'a>(&'a HttpRecord); impl<'a> Future for HttpRecordFinished<'a> { - type Output = (); + type Output = bool; fn poll( self: Pin<&mut Self>, @@ -495,7 +496,10 @@ impl HttpRecord { ) -> Poll { let mut mut_self = self.0.self_mut(); if mut_self.response_body_finished { - return Poll::Ready(()); + // If we sent the response body and the trailers, this body completed successfully + return Poll::Ready( + mut_self.response_body.is_complete() && mut_self.trailers.is_none(), + ); } mut_self.response_body_waker = Some(cx.waker().clone()); Poll::Pending diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index ceaf7aeb8..07ef66146 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -59,7 +59,7 @@ import { ERR_UNESCAPED_CHARACTERS, } from "ext:deno_node/internal/errors.ts"; import { getTimerDuration } from "ext:deno_node/internal/timers.mjs"; -import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js"; +import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.ts"; import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 023b6acd3..02e66e3da 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -36,7 +36,7 @@ import { } from "ext:deno_node/internal/stream_base_commons.ts"; import { FileHandle } from "node:fs/promises"; import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; -import { serveHttpOnConnection } from "ext:deno_http/00_serve.js"; +import { serveHttpOnConnection } from "ext:deno_http/00_serve.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts"; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; import { Duplex } from "node:stream"; diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 96799cb09..02ac7b602 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -13,7 +13,7 @@ import * as console from "ext:deno_console/01_console.js"; import * as ffi from "ext:deno_ffi/00_ffi.js"; import * as net from "ext:deno_net/01_net.js"; import * as tls from "ext:deno_net/02_tls.js"; -import * as serve from "ext:deno_http/00_serve.js"; +import * as serve from "ext:deno_http/00_serve.ts"; import * as http from "ext:deno_http/01_http.js"; import * as websocket from "ext:deno_http/02_websocket.ts"; import * as errors from "ext:runtime/01_errors.js"; diff --git a/tests/unit/serve_test.ts b/tests/unit/serve_test.ts index 048529ae9..32d05056a 100644 --- a/tests/unit/serve_test.ts +++ b/tests/unit/serve_test.ts @@ -2843,7 +2843,20 @@ Deno.test( async function httpServerCancelFetch() { const request2 = Promise.withResolvers(); const request2Aborted = Promise.withResolvers(); - const { finished, abort } = await makeServer(async (req) => { + let completed = 0; + let aborted = 0; + const { finished, abort } = await makeServer(async (req, context) => { + context.completed.then(() => { + console.log("completed"); + completed++; + }).catch(() => { + console.log("completed (error)"); + completed++; + }); + req.signal.onabort = () => { + console.log("aborted", req.url); + aborted++; + }; if (req.url.endsWith("/1")) { const fetchRecursive = await fetch(`http://localhost:${servePort}/2`); return new Response(fetchRecursive.body); @@ -2871,6 +2884,8 @@ Deno.test( abort(); await finished; + assertEquals(completed, 2); + assertEquals(aborted, 2); }, ); diff --git a/tools/core_import_map.json b/tools/core_import_map.json index 463095de8..421769e52 100644 --- a/tools/core_import_map.json +++ b/tools/core_import_map.json @@ -15,7 +15,7 @@ "ext:deno_fetch/26_fetch.js": "../ext/fetch/26_fetch.js", "ext:deno_ffi/00_ffi.js": "../ext/ffi/00_ffi.js", "ext:deno_fs/30_fs.js": "../ext/fs/30_fs.js", - "ext:deno_http/00_serve.js": "../ext/http/00_serve.js", + "ext:deno_http/00_serve.ts": "../ext/http/00_serve.ts", "ext:deno_http/01_http.js": "../ext/http/01_http.js", "ext:deno_io/12_io.js": "../ext/io/12_io.js", "ext:deno_kv/01_db.ts": "../ext/kv/01_db.ts", -- cgit v1.2.3