summaryrefslogtreecommitdiff
path: root/ext/flash/01_http.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/flash/01_http.js')
-rw-r--r--ext/flash/01_http.js793
1 files changed, 0 insertions, 793 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
deleted file mode 100644
index fe503ed05..000000000
--- a/ext/flash/01_http.js
+++ /dev/null
@@ -1,793 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-const core = globalThis.Deno.core;
-const ops = core.ops;
-const primordials = globalThis.__bootstrap.primordials;
-import { BlobPrototype } from "ext:deno_web/09_file.js";
-import { TcpConn } from "ext:deno_net/01_net.js";
-import { toInnerResponse } from "ext:deno_fetch/23_response.js";
-import { _flash, fromFlashRequest } from "ext:deno_fetch/23_request.js";
-import { Event } from "ext:deno_web/02_event.js";
-import {
- _state,
- getReadableStreamResourceBacking,
- ReadableStream,
- readableStreamClose,
- ReadableStreamPrototype,
-} from "ext:deno_web/06_streams.js";
-import {
- _eventLoop,
- _idleTimeoutDuration,
- _idleTimeoutTimeout,
- _protocol,
- _readyState,
- _rid,
- _serverHandleIdleTimeout,
- WebSocket,
-} from "ext:deno_websocket/01_websocket.js";
-import { _ws } from "ext:deno_http/01_http.js";
-const {
- ObjectPrototypeIsPrototypeOf,
- PromisePrototype,
- PromisePrototypeCatch,
- PromisePrototypeThen,
- SafePromiseAll,
- TypedArrayPrototypeGetByteLength,
- TypedArrayPrototypeGetSymbolToStringTag,
- TypedArrayPrototypeSet,
- TypedArrayPrototypeSubarray,
- TypeError,
- Uint8Array,
-} = primordials;
-
-const statusCodes = {
- 100: "Continue",
- 101: "Switching Protocols",
- 102: "Processing",
- 200: "OK",
- 201: "Created",
- 202: "Accepted",
- 203: "Non Authoritative Information",
- 204: "No Content",
- 205: "Reset Content",
- 206: "Partial Content",
- 207: "Multi-Status",
- 208: "Already Reported",
- 226: "IM Used",
- 300: "Multiple Choices",
- 301: "Moved Permanently",
- 302: "Found",
- 303: "See Other",
- 304: "Not Modified",
- 305: "Use Proxy",
- 307: "Temporary Redirect",
- 308: "Permanent Redirect",
- 400: "Bad Request",
- 401: "Unauthorized",
- 402: "Payment Required",
- 403: "Forbidden",
- 404: "Not Found",
- 405: "Method Not Allowed",
- 406: "Not Acceptable",
- 407: "Proxy Authentication Required",
- 408: "Request Timeout",
- 409: "Conflict",
- 410: "Gone",
- 411: "Length Required",
- 412: "Precondition Failed",
- 413: "Payload Too Large",
- 414: "URI Too Long",
- 415: "Unsupported Media Type",
- 416: "Range Not Satisfiable",
- 418: "I'm a teapot",
- 421: "Misdirected Request",
- 422: "Unprocessable Entity",
- 423: "Locked",
- 424: "Failed Dependency",
- 426: "Upgrade Required",
- 428: "Precondition Required",
- 429: "Too Many Requests",
- 431: "Request Header Fields Too Large",
- 451: "Unavailable For Legal Reasons",
- 500: "Internal Server Error",
- 501: "Not Implemented",
- 502: "Bad Gateway",
- 503: "Service Unavailable",
- 504: "Gateway Timeout",
- 505: "HTTP Version Not Supported",
- 506: "Variant Also Negotiates",
- 507: "Insufficient Storage",
- 508: "Loop Detected",
- 510: "Not Extended",
- 511: "Network Authentication Required",
-};
-
-const methods = {
- 0: "GET",
- 1: "HEAD",
- 2: "CONNECT",
- 3: "PUT",
- 4: "DELETE",
- 5: "OPTIONS",
- 6: "TRACE",
- 7: "POST",
- 8: "PATCH",
-};
-
-let dateInterval;
-let date;
-
-/**
- * Construct an HTTP response message.
- * All HTTP/1.1 messages consist of a start-line followed by a sequence
- * of octets.
- *
- * HTTP-message = start-line
- * *( header-field CRLF )
- * CRLF
- * [ message-body ]
- *
- * @param {keyof typeof methods} method
- * @param {keyof typeof statusCodes} status
- * @param {[name: string, value: string][]} headerList
- * @param {Uint8Array | string | null} body
- * @param {number} bodyLen
- * @param {boolean} earlyEnd
- * @returns {Uint8Array | string}
- */
-function http1Response(
- method,
- status,
- headerList,
- body,
- bodyLen,
- earlyEnd = false,
-) {
- // HTTP uses a "<major>.<minor>" numbering scheme
- // HTTP-version = HTTP-name "/" DIGIT "." DIGIT
- // HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive
- //
- // status-line = HTTP-version SP status-code SP reason-phrase CRLF
- // Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2
- let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`;
- for (let i = 0; i < headerList.length; ++i) {
- const { 0: name, 1: value } = headerList[i];
- // header-field = field-name ":" OWS field-value OWS
- str += `${name}: ${value}\r\n`;
- }
-
- // https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6
- if (status === 205 || status === 304) {
- // MUST NOT generate a payload in a 205 response.
- // indicate a zero-length body for the response by
- // including a Content-Length header field with a value of 0.
- str += "Content-Length: 0\r\n\r\n";
- return str;
- }
-
- // MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204.
- if (status === 204 || status < 200) {
- str += "\r\n";
- return str;
- }
-
- if (earlyEnd === true) {
- return str;
- }
-
- // null body status is validated by inititalizeAResponse in ext/fetch
- if (body !== null && body !== undefined) {
- str += `Content-Length: ${bodyLen}\r\n\r\n`;
- } else {
- str += "Transfer-Encoding: chunked\r\n\r\n";
- return str;
- }
-
- // A HEAD request.
- if (method === 1) return str;
-
- if (typeof body === "string") {
- str += body ?? "";
- } else {
- const head = core.encode(str);
- const response = new Uint8Array(
- TypedArrayPrototypeGetByteLength(head) + bodyLen,
- );
- TypedArrayPrototypeSet(response, head, 0);
- TypedArrayPrototypeSet(
- response,
- body,
- TypedArrayPrototypeGetByteLength(head),
- );
- return response;
- }
-
- return str;
-}
-
-function prepareFastCalls() {
- return ops.op_flash_make_request();
-}
-
-function hostnameForDisplay(hostname) {
- // If the hostname is "0.0.0.0", we display "localhost" in console
- // because browsers in Windows don't resolve "0.0.0.0".
- // See the discussion in https://github.com/denoland/deno_std/issues/1165
- return hostname === "0.0.0.0" ? "localhost" : hostname;
-}
-
-function writeFixedResponse(
- server,
- requestId,
- response,
- responseLen,
- end,
- respondFast,
-) {
- let nwritten = 0;
- // TypedArray
- if (typeof response !== "string") {
- nwritten = respondFast(requestId, response, end);
- } else {
- // string
- nwritten = ops.op_flash_respond(
- server,
- requestId,
- response,
- end,
- );
- }
-
- if (nwritten < responseLen) {
- core.opAsync(
- "op_flash_respond_async",
- server,
- requestId,
- response.slice(nwritten),
- end,
- );
- }
-}
-
-// TODO(@littledivy): Woah woah, cut down the number of arguments.
-async function handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
-) {
- // there might've been an HTTP upgrade.
- if (resp === undefined) {
- return;
- }
- const innerResp = toInnerResponse(resp);
- // If response body length is known, it will be sent synchronously in a
- // single op, in other case a "response body" resource will be created and
- // we'll be streaming it.
- /** @type {ReadableStream<Uint8Array> | Uint8Array | string | null} */
- let respBody = null;
- let isStreamingResponseBody = false;
- if (innerResp.body !== null) {
- if (typeof innerResp.body.streamOrStatic?.body === "string") {
- if (innerResp.body.streamOrStatic.consumed === true) {
- throw new TypeError("Body is unusable.");
- }
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
- isStreamingResponseBody = false;
- } else if (
- ObjectPrototypeIsPrototypeOf(
- ReadableStreamPrototype,
- innerResp.body.streamOrStatic,
- )
- ) {
- if (innerResp.body.unusable()) {
- throw new TypeError("Body is unusable.");
- }
- if (
- innerResp.body.length === null ||
- ObjectPrototypeIsPrototypeOf(
- BlobPrototype,
- innerResp.body.source,
- )
- ) {
- respBody = innerResp.body.stream;
- } else {
- const reader = innerResp.body.stream.getReader();
- const r1 = await reader.read();
- if (r1.done) {
- respBody = new Uint8Array(0);
- } else {
- respBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
- }
- }
- isStreamingResponseBody = !(
- typeof respBody === "string" ||
- TypedArrayPrototypeGetSymbolToStringTag(respBody) === "Uint8Array"
- );
- } else {
- if (innerResp.body.streamOrStatic.consumed === true) {
- throw new TypeError("Body is unusable.");
- }
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
- }
- } else {
- respBody = new Uint8Array(0);
- }
-
- const ws = resp[_ws];
- if (isStreamingResponseBody === false) {
- const length = typeof respBody === "string"
- ? core.byteLength(respBody)
- : TypedArrayPrototypeGetByteLength(respBody);
- const responseStr = http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- respBody,
- length,
- );
- // A HEAD request always ignores body, but includes the correct content-length size.
- const responseLen = method === 1 ? core.byteLength(responseStr) : length;
- writeFixedResponse(
- serverId,
- i,
- responseStr,
- responseLen,
- !ws, // Don't close socket if there is a deferred websocket upgrade.
- respondFast,
- );
- }
-
- return (async () => {
- if (!ws) {
- if (hasBody && body[_state] !== "closed") {
- // TODO(@littledivy): Optimize by draining in a single op.
- try {
- await req.arrayBuffer();
- } catch { /* pass */ }
- }
- }
-
- if (isStreamingResponseBody === true) {
- const resourceBacking = getReadableStreamResourceBacking(respBody);
- if (resourceBacking) {
- if (respBody.locked) {
- throw new TypeError("ReadableStream is locked.");
- }
- const reader = respBody.getReader(); // Aquire JS lock.
- try {
- PromisePrototypeThen(
- core.opAsync(
- "op_flash_write_resource",
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- 0, // Content-Length will be set by the op.
- true,
- ),
- serverId,
- i,
- resourceBacking.rid,
- resourceBacking.autoClose,
- ),
- () => {
- // Release JS lock.
- readableStreamClose(respBody);
- },
- );
- } catch (error) {
- await reader.cancel(error);
- throw error;
- }
- } else {
- const reader = respBody.getReader();
-
- // Best case: sends headers + first chunk in a single go.
- const { value, done } = await reader.read();
- writeFixedResponse(
- serverId,
- i,
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- // deno-lint-ignore prefer-primordials
- respBody.byteLength,
- ),
- // deno-lint-ignore prefer-primordials
- respBody.byteLength,
- false,
- respondFast,
- );
-
- await tryRespondChunked(
- i,
- value,
- done,
- );
-
- if (!done) {
- while (true) {
- const chunk = await reader.read();
- await respondChunked(
- i,
- chunk.value,
- chunk.done,
- );
- if (chunk.done) break;
- }
- }
- }
- }
-
- if (ws) {
- const wsRid = await core.opAsync(
- "op_flash_upgrade_websocket",
- serverId,
- i,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
-
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
-
- ws[_eventLoop]();
- if (ws[_idleTimeoutDuration]) {
- ws.addEventListener(
- "close",
- () => clearTimeout(ws[_idleTimeoutTimeout]),
- );
- }
- ws[_serverHandleIdleTimeout]();
- }
- })();
-}
-
-function createServe(opFn) {
- return async function serve(arg1, arg2) {
- let options = undefined;
- let handler = undefined;
- if (typeof arg1 === "function") {
- handler = arg1;
- options = arg2;
- } else if (typeof arg2 === "function") {
- handler = arg2;
- options = arg1;
- } else {
- options = arg1;
- }
- if (handler === undefined) {
- if (options === undefined) {
- throw new TypeError(
- "No handler was provided, so an options bag is mandatory.",
- );
- }
- handler = options.handler;
- }
- if (typeof handler !== "function") {
- throw new TypeError("A handler function must be provided.");
- }
- if (options === undefined) {
- options = {};
- }
-
- const signal = options.signal;
-
- const onError = options.onError ?? function (error) {
- console.error(error);
- return new Response("Internal Server Error", { status: 500 });
- };
-
- const onListen = options.onListen ?? function ({ port }) {
- console.log(
- `Listening on http://${
- hostnameForDisplay(listenOpts.hostname)
- }:${port}/`,
- );
- };
-
- const listenOpts = {
- hostname: options.hostname ?? "127.0.0.1",
- port: options.port ?? 9000,
- reuseport: options.reusePort ?? false,
- };
- if (options.cert || options.key) {
- if (!options.cert || !options.key) {
- throw new TypeError(
- "Both cert and key must be provided to enable HTTPS.",
- );
- }
- listenOpts.cert = options.cert;
- listenOpts.key = options.key;
- }
-
- const serverId = opFn(listenOpts);
- const serverPromise = core.opAsync("op_flash_drive_server", serverId);
-
- PromisePrototypeCatch(
- PromisePrototypeThen(
- core.opAsync("op_flash_wait_for_listening", serverId),
- (port) => {
- onListen({ hostname: listenOpts.hostname, port });
- },
- ),
- () => {},
- );
- const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
-
- const server = {
- id: serverId,
- transport: listenOpts.cert && listenOpts.key ? "https" : "http",
- hostname: listenOpts.hostname,
- port: listenOpts.port,
- closed: false,
- finished: finishedPromise,
- async close() {
- if (server.closed) {
- return;
- }
- server.closed = true;
- await core.opAsync("op_flash_close_server", serverId);
- await server.finished;
- },
- async serve() {
- let offset = 0;
- while (true) {
- if (server.closed) {
- break;
- }
-
- let tokens = nextRequestSync();
- if (tokens === 0) {
- tokens = await core.opAsync("op_flash_next_async", serverId);
- if (server.closed) {
- break;
- }
- }
-
- for (let i = offset; i < offset + tokens; i++) {
- let body = null;
- // There might be a body, but we don't expose it for GET/HEAD requests.
- // It will be closed automatically once the request has been handled and
- // the response has been sent.
- const method = getMethodSync(i);
- let hasBody = method > 2; // Not GET/HEAD/CONNECT
- if (hasBody) {
- body = createRequestBodyStream(serverId, i);
- if (body === null) {
- hasBody = false;
- }
- }
-
- const req = fromFlashRequest(
- serverId,
- /* streamRid */
- i,
- body,
- /* methodCb */
- () => methods[method],
- /* urlCb */
- () => {
- const path = ops.op_flash_path(serverId, i);
- return `${server.transport}://${server.hostname}:${server.port}${path}`;
- },
- /* headersCb */
- () => ops.op_flash_headers(serverId, i),
- );
-
- let resp;
- let remoteAddr;
- try {
- resp = handler(req, {
- get remoteAddr() {
- if (!remoteAddr) {
- const { 0: hostname, 1: port } = core.ops.op_flash_addr(
- serverId,
- i,
- );
- remoteAddr = { hostname, port };
- }
- return remoteAddr;
- },
- });
- if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) {
- PromisePrototypeCatch(
- PromisePrototypeThen(
- resp,
- (resp) =>
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
- ),
- ),
- onError,
- );
- } else if (typeof resp?.then === "function") {
- resp.then((resp) =>
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
- )
- ).catch(onError);
- } else {
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
- ).catch(onError);
- }
- } catch (e) {
- resp = await onError(e);
- }
- }
-
- offset += tokens;
- }
- await server.finished;
- },
- };
-
- signal?.addEventListener("abort", () => {
- clearInterval(dateInterval);
- PromisePrototypeThen(server.close(), () => {}, () => {});
- }, {
- once: true,
- });
-
- function tryRespondChunked(token, chunk, shutdown) {
- const nwritten = ops.op_try_flash_respond_chunked(
- serverId,
- token,
- chunk ?? new Uint8Array(),
- shutdown,
- );
- if (nwritten > 0) {
- return core.opAsync(
- "op_flash_respond_chunked",
- serverId,
- token,
- chunk,
- shutdown,
- nwritten,
- );
- }
- }
-
- function respondChunked(token, chunk, shutdown) {
- return core.opAsync(
- "op_flash_respond_chunked",
- serverId,
- token,
- chunk,
- shutdown,
- );
- }
-
- const fastOp = prepareFastCalls();
- let nextRequestSync = () => fastOp.nextRequest();
- let getMethodSync = (token) => fastOp.getMethod(token);
- let respondFast = (token, response, shutdown) =>
- fastOp.respond(token, response, shutdown);
- if (serverId > 0) {
- nextRequestSync = () => ops.op_flash_next_server(serverId);
- getMethodSync = (token) => ops.op_flash_method(serverId, token);
- respondFast = (token, response, shutdown) =>
- ops.op_flash_respond(serverId, token, response, null, shutdown);
- }
-
- if (!dateInterval) {
- date = new Date().toUTCString();
- dateInterval = setInterval(() => {
- date = new Date().toUTCString();
- }, 1000);
- }
-
- await SafePromiseAll([
- PromisePrototypeCatch(server.serve(), console.error),
- serverPromise,
- ]);
- };
-}
-
-function createRequestBodyStream(serverId, token) {
- // The first packet is left over bytes after parsing the request
- const firstRead = ops.op_flash_first_packet(
- serverId,
- token,
- );
- if (!firstRead) return null;
- let firstEnqueued = TypedArrayPrototypeGetByteLength(firstRead) === 0;
-
- return new ReadableStream({
- type: "bytes",
- async pull(controller) {
- try {
- if (firstEnqueued === false) {
- controller.enqueue(firstRead);
- firstEnqueued = true;
- return;
- }
- // This is the largest possible size for a single packet on a TLS
- // stream.
- const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await core.opAsync(
- "op_flash_read_body",
- serverId,
- token,
- chunk,
- );
- if (read > 0) {
- // We read some data. Enqueue it onto the stream.
- controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
- } else {
- // We have reached the end of the body, so we close the stream.
- controller.close();
- }
- } catch (err) {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- controller.close();
- }
- },
- });
-}
-
-function upgradeHttpRaw(req) {
- if (!req[_flash]) {
- throw new TypeError(
- "Non-flash requests can not be upgraded with `upgradeHttpRaw`. Use `upgradeHttp` instead.",
- );
- }
-
- // NOTE(bartlomieju):
- // Access these fields so they are cached on `req` object, otherwise
- // they wouldn't be available after the connection gets upgraded.
- req.url;
- req.method;
- req.headers;
-
- const { serverId, streamRid } = req[_flash];
- const connRid = ops.op_flash_upgrade_http(streamRid, serverId);
- // TODO(@littledivy): return already read first packet too.
- return [new TcpConn(connRid), new Uint8Array()];
-}
-
-export { createServe, upgradeHttpRaw };