summaryrefslogtreecommitdiff
path: root/ext/http/00_serve.js
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-04-24 14:03:37 -0400
committerGitHub <noreply@github.com>2024-04-24 14:03:37 -0400
commiteed2598e6cf1db643b4edd07b5eff94c59eb9408 (patch)
tree0320981bba82c78647b9cf335793381400093ad9 /ext/http/00_serve.js
parentb60822f6e0e3c1f3e360657cfb67c114df2e7032 (diff)
feat(ext/http): Implement request.signal for Deno.serve (#23425)
When the response has been successfully send, we abort the `Request.signal` property to indicate that all resources associated with this transaction may be torn down.
Diffstat (limited to 'ext/http/00_serve.js')
-rw-r--r--ext/http/00_serve.js741
1 files changed, 0 insertions, 741 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
deleted file mode 100644
index 52b833f10..000000000
--- a/ext/http/00_serve.js
+++ /dev/null
@@ -1,741 +0,0 @@
-// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
-
-import { core, internals, primordials } from "ext:core/mod.js";
-const {
- BadResourcePrototype,
- InterruptedPrototype,
- internalRidSymbol,
-} = core;
-import {
- op_http_cancel,
- op_http_close,
- op_http_close_after_finish,
- op_http_get_request_headers,
- op_http_get_request_method_and_url,
- op_http_read_request_body,
- op_http_serve,
- op_http_serve_on,
- op_http_set_promise_complete,
- op_http_set_response_body_bytes,
- op_http_set_response_body_resource,
- op_http_set_response_body_text,
- op_http_set_response_header,
- op_http_set_response_headers,
- op_http_set_response_trailers,
- op_http_try_wait,
- op_http_upgrade_raw,
- op_http_upgrade_websocket_next,
- op_http_wait,
-} from "ext:core/ops";
-const {
- ArrayPrototypePush,
- ObjectHasOwn,
- ObjectPrototypeIsPrototypeOf,
- PromisePrototypeCatch,
- PromisePrototypeThen,
- Symbol,
- TypeError,
- TypedArrayPrototypeGetSymbolToStringTag,
- Uint8Array,
-} = primordials;
-
-import { InnerBody } from "ext:deno_fetch/22_body.js";
-import { Event } from "ext:deno_web/02_event.js";
-import {
- fromInnerResponse,
- newInnerResponse,
- ResponsePrototype,
- toInnerResponse,
-} from "ext:deno_fetch/23_response.js";
-import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js";
-import { AbortController } from "ext:deno_web/03_abort_signal.js";
-import {
- _eventLoop,
- _idleTimeoutDuration,
- _idleTimeoutTimeout,
- _protocol,
- _readyState,
- _rid,
- _role,
- _server,
- _serverHandleIdleTimeout,
- SERVER,
- WebSocket,
-} from "ext:deno_websocket/01_websocket.js";
-import {
- Deferred,
- getReadableStreamResourceBacking,
- readableStreamForRid,
- ReadableStreamPrototype,
- resourceForReadableStream,
-} from "ext:deno_web/06_streams.js";
-import { listen, listenOptionApiName, TcpConn } from "ext:deno_net/01_net.js";
-import { hasTlsKeyPairOptions, listenTls } from "ext:deno_net/02_tls.js";
-import { SymbolAsyncDispose } from "ext:deno_web/00_infra.js";
-
-const _upgraded = Symbol("_upgraded");
-
-function internalServerError() {
- // "Internal Server Error"
- return new Response(
- new Uint8Array([
- 73,
- 110,
- 116,
- 101,
- 114,
- 110,
- 97,
- 108,
- 32,
- 83,
- 101,
- 114,
- 118,
- 101,
- 114,
- 32,
- 69,
- 114,
- 114,
- 111,
- 114,
- ]),
- { status: 500 },
- );
-}
-
-// Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded.
-const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse(
- newInnerResponse(101),
- "immutable",
-);
-
-function upgradeHttpRaw(req, conn) {
- const inner = toInnerRequest(req);
- if (inner._wantsUpgrade) {
- return inner._wantsUpgrade("upgradeHttpRaw", conn);
- }
- throw new TypeError("upgradeHttpRaw may only be used with Deno.serve");
-}
-
-function addTrailers(resp, headerList) {
- const inner = toInnerResponse(resp);
- op_http_set_response_trailers(inner.external, headerList);
-}
-
-class InnerRequest {
- #external;
- #context;
- #methodAndUri;
- #streamRid;
- #body;
- #upgraded;
- #urlValue;
-
- constructor(external, context) {
- this.#external = external;
- this.#context = context;
- this.#upgraded = false;
- }
-
- close() {
- this.#external = null;
- }
-
- get [_upgraded]() {
- return this.#upgraded;
- }
-
- _wantsUpgrade(upgradeType, ...originalArgs) {
- if (this.#upgraded) {
- throw new Deno.errors.Http("already upgraded");
- }
- if (this.#external === null) {
- throw new Deno.errors.Http("already closed");
- }
-
- // upgradeHttpRaw is sync
- if (upgradeType == "upgradeHttpRaw") {
- const external = this.#external;
- const underlyingConn = originalArgs[0];
-
- this.url();
- this.headerList;
- this.close();
-
- this.#upgraded = () => {};
-
- const upgradeRid = op_http_upgrade_raw(external);
-
- const conn = new TcpConn(
- upgradeRid,
- underlyingConn?.remoteAddr,
- underlyingConn?.localAddr,
- );
-
- return { response: UPGRADE_RESPONSE_SENTINEL, conn };
- }
-
- // upgradeWebSocket is sync
- if (upgradeType == "upgradeWebSocket") {
- const response = originalArgs[0];
- const ws = originalArgs[1];
-
- const external = this.#external;
-
- this.url();
- this.headerList;
- this.close();
-
- const goAhead = new Deferred();
- this.#upgraded = () => {
- goAhead.resolve();
- };
- const wsPromise = op_http_upgrade_websocket_next(
- external,
- response.headerList,
- );
-
- // Start the upgrade in the background.
- (async () => {
- try {
- // Returns the upgraded websocket connection
- const wsRid = await wsPromise;
-
- // We have to wait for the go-ahead signal
- await goAhead;
-
- ws[_rid] = wsRid;
- ws[_readyState] = WebSocket.OPEN;
- ws[_role] = SERVER;
- const event = new Event("open");
- ws.dispatchEvent(event);
-
- ws[_eventLoop]();
- if (ws[_idleTimeoutDuration]) {
- ws.addEventListener(
- "close",
- () => clearTimeout(ws[_idleTimeoutTimeout]),
- );
- }
- ws[_serverHandleIdleTimeout]();
- } catch (error) {
- const event = new ErrorEvent("error", { error });
- ws.dispatchEvent(event);
- }
- })();
- return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws };
- }
- }
-
- url() {
- if (this.#urlValue !== undefined) {
- return this.#urlValue;
- }
-
- if (this.#methodAndUri === undefined) {
- if (this.#external === null) {
- throw new TypeError("request closed");
- }
- // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider
- // splitting this up into multiple ops.
- this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
- }
-
- const path = this.#methodAndUri[2];
-
- // * is valid for OPTIONS
- if (path === "*") {
- return this.#urlValue = "*";
- }
-
- // If the path is empty, return the authority (valid for CONNECT)
- if (path == "") {
- return this.#urlValue = this.#methodAndUri[1];
- }
-
- // CONNECT requires an authority
- if (this.#methodAndUri[0] == "CONNECT") {
- return this.#urlValue = this.#methodAndUri[1];
- }
-
- const hostname = this.#methodAndUri[1];
- if (hostname) {
- // Construct a URL from the scheme, the hostname, and the path
- return this.#urlValue = this.#context.scheme + hostname + path;
- }
-
- // Construct a URL from the scheme, the fallback hostname, and the path
- return this.#urlValue = this.#context.scheme + this.#context.fallbackHost +
- path;
- }
-
- get remoteAddr() {
- const transport = this.#context.listener?.addr.transport;
- if (transport === "unix" || transport === "unixpacket") {
- return {
- transport,
- path: this.#context.listener.addr.path,
- };
- }
- if (this.#methodAndUri === undefined) {
- if (this.#external === null) {
- throw new TypeError("request closed");
- }
- this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
- }
- return {
- transport: "tcp",
- hostname: this.#methodAndUri[3],
- port: this.#methodAndUri[4],
- };
- }
-
- get method() {
- if (this.#methodAndUri === undefined) {
- if (this.#external === null) {
- throw new TypeError("request closed");
- }
- this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
- }
- return this.#methodAndUri[0];
- }
-
- get body() {
- if (this.#external === null) {
- throw new TypeError("request closed");
- }
- if (this.#body !== undefined) {
- return this.#body;
- }
- // If the method is GET or HEAD, we do not want to include a body here, even if the Rust
- // side of the code is willing to provide it to us.
- if (this.method == "GET" || this.method == "HEAD") {
- this.#body = null;
- return null;
- }
- this.#streamRid = op_http_read_request_body(this.#external);
- this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false));
- return this.#body;
- }
-
- get headerList() {
- if (this.#external === null) {
- throw new TypeError("request closed");
- }
- const headers = [];
- const reqHeaders = op_http_get_request_headers(this.#external);
- for (let i = 0; i < reqHeaders.length; i += 2) {
- ArrayPrototypePush(headers, [reqHeaders[i], reqHeaders[i + 1]]);
- }
- return headers;
- }
-
- get external() {
- return this.#external;
- }
-}
-
-class CallbackContext {
- abortController;
- scheme;
- fallbackHost;
- serverRid;
- closed;
- /** @type {Promise<void> | undefined} */
- closing;
- listener;
-
- constructor(signal, args, listener) {
- // The abort signal triggers a non-graceful shutdown
- signal?.addEventListener(
- "abort",
- () => {
- op_http_cancel(this.serverRid, false);
- },
- { once: true },
- );
- this.abortController = new AbortController();
- this.serverRid = args[0];
- this.scheme = args[1];
- this.fallbackHost = args[2];
- this.closed = false;
- this.listener = listener;
- }
-
- close() {
- try {
- this.closed = true;
- core.tryClose(this.serverRid);
- } catch {
- // Pass
- }
- }
-}
-
-class ServeHandlerInfo {
- #inner = null;
- constructor(inner) {
- this.#inner = inner;
- }
- get remoteAddr() {
- return this.#inner.remoteAddr;
- }
-}
-
-function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
- if (respBody === null || respBody === undefined) {
- // Don't set the body
- innerRequest?.close();
- op_http_set_promise_complete(req, status);
- return;
- }
-
- const stream = respBody.streamOrStatic;
- const body = stream.body;
-
- if (TypedArrayPrototypeGetSymbolToStringTag(body) === "Uint8Array") {
- innerRequest?.close();
- op_http_set_response_body_bytes(req, body, status);
- return;
- }
-
- if (typeof body === "string") {
- innerRequest?.close();
- op_http_set_response_body_text(req, body, status);
- return;
- }
-
- // At this point in the response it needs to be a stream
- if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
- innerRequest?.close();
- throw TypeError("invalid response");
- }
- const resourceBacking = getReadableStreamResourceBacking(stream);
- let rid, autoClose;
- if (resourceBacking) {
- rid = resourceBacking.rid;
- autoClose = resourceBacking.autoClose;
- } else {
- rid = resourceForReadableStream(stream);
- autoClose = true;
- }
- PromisePrototypeThen(
- op_http_set_response_body_resource(
- req,
- rid,
- autoClose,
- status,
- ),
- () => {
- innerRequest?.close();
- op_http_close_after_finish(req);
- },
- );
-}
-
-/**
- * Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided
- * callback, then extracts the response that was returned from that callback. The response is then pulled
- * apart and handled on the Rust side.
- *
- * This function returns a promise that will only reject in the case of abnormal exit.
- */
-function mapToCallback(context, callback, onError) {
- const signal = context.abortController.signal;
-
- return async function (req) {
- // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
- // 500 error.
- let innerRequest;
- let response;
- try {
- innerRequest = new InnerRequest(req, context);
- response = await callback(
- fromInnerRequest(innerRequest, signal, "immutable"),
- new ServeHandlerInfo(innerRequest),
- );
-
- // Throwing Error if the handler return value is not a Response class
- if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
- throw TypeError(
- "Return value from serve handler must be a response or a promise resolving to a response",
- );
- }
- } catch (error) {
- try {
- response = await onError(error);
- if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
- throw TypeError(
- "Return value from onError handler must be a response or a promise resolving to a response",
- );
- }
- } catch (error) {
- console.error("Exception in onError while handling exception", error);
- response = internalServerError();
- }
- }
- const inner = toInnerResponse(response);
- if (innerRequest?.[_upgraded]) {
- // We're done here as the connection has been upgraded during the callback and no longer requires servicing.
- if (response !== UPGRADE_RESPONSE_SENTINEL) {
- console.error("Upgrade response was not returned from callback");
- context.close();
- }
- innerRequest?.[_upgraded]();
- return;
- }
-
- // Did everything shut down while we were waiting?
- if (context.closed) {
- // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
- innerRequest?.close();
- op_http_set_promise_complete(req, 503);
- return;
- }
-
- const status = inner.status;
- const headers = inner.headerList;
- if (headers && headers.length > 0) {
- if (headers.length == 1) {
- op_http_set_response_header(req, headers[0][0], headers[0][1]);
- } else {
- op_http_set_response_headers(req, headers);
- }
- }
-
- fastSyncResponseOrStream(req, inner.body, status, innerRequest);
- };
-}
-
-function serve(arg1, arg2) {
- let options = undefined;
- let handler = undefined;
- if (typeof arg1 === "function") {
- handler = arg1;
- } else if (typeof arg2 === "function") {
- handler = arg2;
- options = arg1;
- } else {
- options = arg1;
- }
- if (handler === undefined) {
- if (options === undefined) {
- throw new TypeError(
- "No handler was provided, so an options bag is mandatory.",
- );
- }
- handler = options.handler;
- }
- if (typeof handler !== "function") {
- throw new TypeError("A handler function must be provided.");
- }
- if (options === undefined) {
- options = {};
- }
-
- const wantsHttps = hasTlsKeyPairOptions(options);
- const wantsUnix = ObjectHasOwn(options, "path");
- const signal = options.signal;
- const onError = options.onError ?? function (error) {
- console.error(error);
- return internalServerError();
- };
-
- if (wantsUnix) {
- const listener = listen({
- transport: "unix",
- path: options.path,
- [listenOptionApiName]: "Deno.serve",
- });
- const path = listener.addr.path;
- return serveHttpOnListener(listener, signal, handler, onError, () => {
- if (options.onListen) {
- options.onListen(listener.addr);
- } else {
- console.log(`Listening on ${path}`);
- }
- });
- }
-
- const listenOpts = {
- hostname: options.hostname ?? "0.0.0.0",
- port: options.port ?? 8000,
- reusePort: options.reusePort ?? false,
- };
-
- if (options.certFile || options.keyFile) {
- throw new TypeError(
- "Unsupported 'certFile' / 'keyFile' options provided: use 'cert' / 'key' instead.",
- );
- }
- if (options.alpnProtocols) {
- throw new TypeError(
- "Unsupported 'alpnProtocols' option provided. 'h2' and 'http/1.1' are automatically supported.",
- );
- }
-
- let listener;
- if (wantsHttps) {
- if (!options.cert || !options.key) {
- throw new TypeError(
- "Both cert and key must be provided to enable HTTPS.",
- );
- }
- listenOpts.cert = options.cert;
- listenOpts.key = options.key;
- listenOpts.alpnProtocols = ["h2", "http/1.1"];
- listener = listenTls(listenOpts);
- listenOpts.port = listener.addr.port;
- } else {
- listener = listen(listenOpts);
- listenOpts.port = listener.addr.port;
- }
-
- const addr = listener.addr;
- // If the hostname is "0.0.0.0", we display "localhost" in console
- // because browsers in Windows don't resolve "0.0.0.0".
- // See the discussion in https://github.com/denoland/deno_std/issues/1165
- const hostname = addr.hostname == "0.0.0.0" ? "localhost" : addr.hostname;
- addr.hostname = hostname;
-
- const onListen = (scheme) => {
- if (options.onListen) {
- options.onListen(addr);
- } else {
- console.log(`Listening on ${scheme}${addr.hostname}:${addr.port}/`);
- }
- };
-
- return serveHttpOnListener(listener, signal, handler, onError, onListen);
-}
-
-/**
- * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary listener.
- */
-function serveHttpOnListener(listener, signal, handler, onError, onListen) {
- const context = new CallbackContext(
- signal,
- op_http_serve(listener[internalRidSymbol]),
- listener,
- );
- const callback = mapToCallback(context, handler, onError);
-
- onListen(context.scheme);
-
- return serveHttpOn(context, listener.addr, callback);
-}
-
-/**
- * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary connection.
- */
-function serveHttpOnConnection(connection, signal, handler, onError, onListen) {
- const context = new CallbackContext(
- signal,
- op_http_serve_on(connection[internalRidSymbol]),
- null,
- );
- const callback = mapToCallback(context, handler, onError);
-
- onListen(context.scheme);
-
- return serveHttpOn(context, connection.localAddr, callback);
-}
-
-function serveHttpOn(context, addr, callback) {
- let ref = true;
- let currentPromise = null;
-
- const promiseErrorHandler = (error) => {
- // Abnormal exit
- console.error(
- "Terminating Deno.serve loop due to unexpected error",
- error,
- );
- context.close();
- };
-
- // Run the server
- const finished = (async () => {
- const rid = context.serverRid;
- while (true) {
- let req;
- try {
- // Attempt to pull as many requests out of the queue as possible before awaiting. This API is
- // a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
- while ((req = op_http_try_wait(rid)) !== null) {
- PromisePrototypeCatch(callback(req), promiseErrorHandler);
- }
- currentPromise = op_http_wait(rid);
- if (!ref) {
- core.unrefOpPromise(currentPromise);
- }
- req = await currentPromise;
- currentPromise = null;
- } catch (error) {
- if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) {
- break;
- }
- if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) {
- break;
- }
- throw new Deno.errors.Http(error);
- }
- if (req === null) {
- break;
- }
- PromisePrototypeCatch(callback(req), promiseErrorHandler);
- }
-
- if (!context.closing && !context.closed) {
- context.closing = op_http_close(rid, false);
- context.close();
- }
-
- await context.closing;
- context.close();
- context.closed = true;
- })();
-
- return {
- addr,
- finished,
- async shutdown() {
- if (!context.closing && !context.closed) {
- // Shut this HTTP server down gracefully
- context.closing = op_http_close(context.serverRid, true);
- }
- await context.closing;
- context.closed = true;
- },
- ref() {
- ref = true;
- if (currentPromise) {
- core.refOpPromise(currentPromise);
- }
- },
- unref() {
- ref = false;
- if (currentPromise) {
- core.unrefOpPromise(currentPromise);
- }
- },
- [SymbolAsyncDispose]() {
- return this.shutdown();
- },
- };
-}
-
-internals.addTrailers = addTrailers;
-internals.upgradeHttpRaw = upgradeHttpRaw;
-internals.serveHttpOnListener = serveHttpOnListener;
-internals.serveHttpOnConnection = serveHttpOnConnection;
-
-export {
- addTrailers,
- serve,
- serveHttpOnConnection,
- serveHttpOnListener,
- upgradeHttpRaw,
-};