summaryrefslogtreecommitdiff
path: root/ext/flash/01_http.js
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-08-18 17:35:02 +0530
committerGitHub <noreply@github.com>2022-08-18 17:35:02 +0530
commitcd21cff29942f24ba7d38287186cce64d0e84e56 (patch)
treee663eff884526ee762ae9141a3cf5a0f6967a84e /ext/flash/01_http.js
parent0b0843e4a54d7c1ddf293ac1ccee2479b69a5ba9 (diff)
feat(ext/flash): An optimized http/1.1 server (#15405)
Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com> Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl> Co-authored-by: crowlkats <crowlkats@toaxl.com> Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'ext/flash/01_http.js')
-rw-r--r--ext/flash/01_http.js569
1 files changed, 569 insertions, 0 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
new file mode 100644
index 000000000..fd817219e
--- /dev/null
+++ b/ext/flash/01_http.js
@@ -0,0 +1,569 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const { BlobPrototype } = window.__bootstrap.file;
+ const { fromFlashRequest, toInnerResponse } = window.__bootstrap.fetch;
+ const core = window.Deno.core;
+ const {
+ ReadableStream,
+ ReadableStreamPrototype,
+ getReadableStreamRid,
+ readableStreamClose,
+ _state,
+ } = window.__bootstrap.streams;
+ const {
+ WebSocket,
+ _rid,
+ _readyState,
+ _eventLoop,
+ _protocol,
+ _server,
+ _idleTimeoutDuration,
+ _idleTimeoutTimeout,
+ _serverHandleIdleTimeout,
+ } = window.__bootstrap.webSocket;
+ const { _ws } = window.__bootstrap.http;
+ const {
+ ObjectPrototypeIsPrototypeOf,
+ TypedArrayPrototypeSubarray,
+ TypeError,
+ Uint8Array,
+ Uint8ArrayPrototype,
+ } = window.__bootstrap.primordials;
+
+ const statusCodes = {
+ 100: "Continue",
+ 101: "Switching Protocols",
+ 102: "Processing",
+ 200: "OK",
+ 201: "Created",
+ 202: "Accepted",
+ 203: "Non Authoritative Information",
+ 204: "No Content",
+ 205: "Reset Content",
+ 206: "Partial Content",
+ 207: "Multi-Status",
+ 208: "Already Reported",
+ 226: "IM Used",
+ 300: "Multiple Choices",
+ 301: "Moved Permanently",
+ 302: "Found",
+ 303: "See Other",
+ 304: "Not Modified",
+ 305: "Use Proxy",
+ 307: "Temporary Redirect",
+ 308: "Permanent Redirect",
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 402: "Payment Required",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 405: "Method Not Allowed",
+ 406: "Not Acceptable",
+ 407: "Proxy Authentication Required",
+ 408: "Request Timeout",
+ 409: "Conflict",
+ 410: "Gone",
+ 411: "Length Required",
+ 412: "Precondition Failed",
+ 413: "Payload Too Large",
+ 414: "URI Too Long",
+ 415: "Unsupported Media Type",
+ 416: "Range Not Satisfiable",
+ 418: "I'm a teapot",
+ 421: "Misdirected Request",
+ 422: "Unprocessable Entity",
+ 423: "Locked",
+ 424: "Failed Dependency",
+ 426: "Upgrade Required",
+ 428: "Precondition Required",
+ 429: "Too Many Requests",
+ 431: "Request Header Fields Too Large",
+ 451: "Unavailable For Legal Reasons",
+ 500: "Internal Server Error",
+ 501: "Not Implemented",
+ 502: "Bad Gateway",
+ 503: "Service Unavailable",
+ 504: "Gateway Timeout",
+ 505: "HTTP Version Not Supported",
+ 506: "Variant Also Negotiates",
+ 507: "Insufficient Storage",
+ 508: "Loop Detected",
+ 510: "Not Extended",
+ 511: "Network Authentication Required",
+ };
+
+ const methods = {
+ 0: "GET",
+ 1: "HEAD",
+ 2: "CONNECT",
+ 3: "PUT",
+ 4: "DELETE",
+ 5: "OPTIONS",
+ 6: "TRACE",
+ 7: "POST",
+ 8: "PATCH",
+ };
+
+ let dateInterval;
+ let date;
+ let stringResources = {};
+
+ // Construct an HTTP response message.
+ // All HTTP/1.1 messages consist of a start-line followed by a sequence
+ // of octets.
+ //
+ // HTTP-message = start-line
+ // *( header-field CRLF )
+ // CRLF
+ // [ message-body ]
+ //
+ function http1Response(method, status, headerList, body, earlyEnd = false) {
+ // HTTP uses a "<major>.<minor>" numbering scheme
+ // HTTP-version = HTTP-name "/" DIGIT "." DIGIT
+ // HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive
+ //
+ // status-line = HTTP-version SP status-code SP reason-phrase CRLF
+ // Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2
+ let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`;
+ for (const [name, value] of headerList) {
+ // header-field = field-name ":" OWS field-value OWS
+ str += `${name}: ${value}\r\n`;
+ }
+
+ // https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6
+ if (status === 205 || status === 304) {
+ // MUST NOT generate a payload in a 205 response.
+ // indicate a zero-length body for the response by
+ // including a Content-Length header field with a value of 0.
+ str += "Content-Length: 0\r\n";
+ return str;
+ }
+
+ // MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204.
+ if (status == 204 && status <= 100) {
+ return str;
+ }
+
+ if (earlyEnd === true) {
+ return str;
+ }
+
+ // null body status is validated by inititalizeAResponse in ext/fetch
+ if (body !== null && body !== undefined) {
+ str += `Content-Length: ${body.length}\r\n\r\n`;
+ } else {
+ str += "Transfer-Encoding: chunked\r\n\r\n";
+ return str;
+ }
+
+ // A HEAD request.
+ if (method === 1) return str;
+
+ if (typeof body === "string") {
+ str += body ?? "";
+ } else {
+ const head = core.encode(str);
+ const response = new Uint8Array(head.byteLength + body.byteLength);
+ response.set(head, 0);
+ response.set(body, head.byteLength);
+ return response;
+ }
+
+ return str;
+ }
+
+ function prepareFastCalls() {
+ return core.opSync("op_flash_make_request");
+ }
+
+ function hostnameForDisplay(hostname) {
+ // If the hostname is "0.0.0.0", we display "localhost" in console
+ // because browsers in Windows don't resolve "0.0.0.0".
+ // See the discussion in https://github.com/denoland/deno_std/issues/1165
+ return hostname === "0.0.0.0" ? "localhost" : hostname;
+ }
+
+ function serve(handler, opts = {}) {
+ delete opts.key;
+ delete opts.cert;
+ return serveInner(handler, opts, false);
+ }
+
+ function serveTls(handler, opts = {}) {
+ return serveInner(handler, opts, true);
+ }
+
+ function serveInner(handler, opts, useTls) {
+ opts = { hostname: "127.0.0.1", port: 9000, useTls, ...opts };
+ const signal = opts.signal;
+ delete opts.signal;
+ const onError = opts.onError ?? function (error) {
+ console.error(error);
+ return new Response("Internal Server Error", { status: 500 });
+ };
+ delete opts.onError;
+ const onListen = opts.onListen ?? function () {
+ console.log(
+ `Listening on http://${
+ hostnameForDisplay(opts.hostname)
+ }:${opts.port}/`,
+ );
+ };
+ delete opts.onListen;
+ const serverId = core.ops.op_flash_serve(opts);
+ const serverPromise = core.opAsync("op_flash_drive_server", serverId);
+
+ core.opAsync("op_flash_wait_for_listening", serverId).then(() => {
+ onListen({ hostname: opts.hostname, port: opts.port });
+ });
+
+ const server = {
+ id: serverId,
+ transport: opts.cert && opts.key ? "https" : "http",
+ hostname: opts.hostname,
+ port: opts.port,
+ closed: false,
+ finished: (async () => {
+ return await serverPromise;
+ })(),
+ async close() {
+ if (server.closed) {
+ return;
+ }
+ server.closed = true;
+ await core.opAsync("op_flash_close_server", serverId);
+ await server.finished;
+ },
+ async serve() {
+ while (true) {
+ if (server.closed) {
+ break;
+ }
+
+ let token = nextRequestSync();
+ if (token === 0) {
+ token = await core.opAsync("op_flash_next_async", serverId);
+ if (server.closed) {
+ break;
+ }
+ }
+
+ for (let i = 0; i < token; i++) {
+ let body = null;
+ // There might be a body, but we don't expose it for GET/HEAD requests.
+ // It will be closed automatically once the request has been handled and
+ // the response has been sent.
+ const method = getMethodSync(i);
+ let hasBody = method > 2; // Not GET/HEAD/CONNECT
+ if (hasBody) {
+ body = createRequestBodyStream(serverId, i);
+ if (body === null) {
+ hasBody = false;
+ }
+ }
+
+ const req = fromFlashRequest(
+ serverId,
+ /* streamRid */
+ i,
+ body,
+ /* methodCb */
+ () => methods[method],
+ /* urlCb */
+ () => {
+ const path = core.ops.op_flash_path(serverId, i);
+ return `${server.transport}://${server.hostname}:${server.port}${path}`;
+ },
+ /* headersCb */
+ () => core.ops.op_flash_headers(serverId, i),
+ );
+
+ let resp;
+ try {
+ resp = await handler(req);
+ } catch (e) {
+ resp = await onError(e);
+ }
+ // there might've been an HTTP upgrade.
+ if (resp === undefined) {
+ continue;
+ }
+
+ const ws = resp[_ws];
+ if (!ws) {
+ if (hasBody && body[_state] !== "closed") {
+ // TODO(@littledivy): Optimize by draining in a single op.
+ try {
+ await req.arrayBuffer();
+ } catch { /* pass */ }
+ }
+ }
+
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ let isStreamingResponseBody = false;
+ if (innerResp.body !== null) {
+ if (typeof innerResp.body.streamOrStatic?.body === "string") {
+ if (innerResp.body.streamOrStatic.consumed === true) {
+ throw new TypeError("Body is unusable.");
+ }
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ isStreamingResponseBody = false;
+ } else if (
+ ObjectPrototypeIsPrototypeOf(
+ ReadableStreamPrototype,
+ innerResp.body.streamOrStatic,
+ )
+ ) {
+ if (innerResp.body.unusable()) {
+ throw new TypeError("Body is unusable.");
+ }
+ if (
+ innerResp.body.length === null ||
+ ObjectPrototypeIsPrototypeOf(
+ BlobPrototype,
+ innerResp.body.source,
+ )
+ ) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
+ }
+ isStreamingResponseBody = !(
+ typeof respBody === "string" ||
+ ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
+ );
+ } else {
+ if (innerResp.body.streamOrStatic.consumed === true) {
+ throw new TypeError("Body is unusable.");
+ }
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ }
+ } else {
+ respBody = new Uint8Array(0);
+ }
+
+ if (isStreamingResponseBody === true) {
+ const resourceRid = getReadableStreamRid(respBody);
+ if (resourceRid) {
+ if (respBody.locked) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ const reader = respBody.getReader(); // Aquire JS lock.
+ try {
+ core.opAsync(
+ "op_flash_write_resource",
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ true,
+ ),
+ serverId,
+ i,
+ resourceRid,
+ ).then(() => {
+ // Release JS lock.
+ readableStreamClose(respBody);
+ });
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
+ } else {
+ const reader = respBody.getReader();
+ let first = true;
+ a:
+ while (true) {
+ const { value, done } = await reader.read();
+ if (first) {
+ first = false;
+ core.ops.op_flash_respond(
+ serverId,
+ i,
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ ),
+ value ?? new Uint8Array(),
+ false,
+ );
+ } else {
+ if (value === undefined) {
+ core.ops.op_flash_respond_chuncked(
+ serverId,
+ i,
+ undefined,
+ done,
+ );
+ } else {
+ respondChunked(
+ i,
+ value,
+ done,
+ );
+ }
+ }
+ if (done) break a;
+ }
+ }
+ } else {
+ const responseStr = http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ respBody,
+ );
+
+ // TypedArray
+ if (typeof responseStr !== "string") {
+ respondFast(i, responseStr, !ws);
+ } else {
+ // string
+ const maybeResponse = stringResources[responseStr];
+ if (maybeResponse === undefined) {
+ stringResources[responseStr] = core.encode(responseStr);
+ core.ops.op_flash_respond(
+ serverId,
+ i,
+ stringResources[responseStr],
+ null,
+ !ws, // Don't close socket if there is a deferred websocket upgrade.
+ );
+ } else {
+ respondFast(i, maybeResponse, !ws);
+ }
+ }
+ }
+
+ if (ws) {
+ const wsRid = await core.opAsync(
+ "op_flash_upgrade_websocket",
+ serverId,
+ i,
+ );
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ if (ws[_idleTimeoutDuration]) {
+ ws.addEventListener(
+ "close",
+ () => clearTimeout(ws[_idleTimeoutTimeout]),
+ );
+ }
+ ws[_serverHandleIdleTimeout]();
+ }
+ }
+ }
+ await server.finished;
+ },
+ };
+
+ signal?.addEventListener("abort", () => {
+ clearInterval(dateInterval);
+ server.close().then(() => {}, () => {});
+ }, {
+ once: true,
+ });
+
+ const fastOp = prepareFastCalls();
+ let nextRequestSync = () => fastOp.nextRequest();
+ let getMethodSync = (token) => fastOp.getMethod(token);
+ let respondChunked = (token, chunk, shutdown) =>
+ fastOp.respondChunked(token, chunk, shutdown);
+ let respondFast = (token, response, shutdown) =>
+ fastOp.respond(token, response, shutdown);
+ if (serverId > 0) {
+ nextRequestSync = () => core.ops.op_flash_next_server(serverId);
+ getMethodSync = (token) => core.ops.op_flash_method(serverId, token);
+ respondChunked = (token, chunk, shutdown) =>
+ core.ops.op_flash_respond_chuncked(serverId, token, chunk, shutdown);
+ respondFast = (token, response, shutdown) =>
+ core.ops.op_flash_respond(serverId, token, response, null, shutdown);
+ }
+
+ if (!dateInterval) {
+ date = new Date().toUTCString();
+ dateInterval = setInterval(() => {
+ date = new Date().toUTCString();
+ stringResources = {};
+ }, 1000);
+ }
+
+ return server.serve().catch(console.error);
+ }
+
+ function createRequestBodyStream(serverId, token) {
+ // The first packet is left over bytes after parsing the request
+ const firstRead = core.ops.op_flash_first_packet(
+ serverId,
+ token,
+ );
+ if (!firstRead) return null;
+ let firstEnqueued = firstRead.byteLength == 0;
+
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ try {
+ if (firstEnqueued === false) {
+ controller.enqueue(firstRead);
+ firstEnqueued = true;
+ return;
+ }
+ // This is the largest possible size for a single packet on a TLS
+ // stream.
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const read = await core.opAsync(
+ "op_flash_read_body",
+ serverId,
+ token,
+ chunk,
+ );
+ if (read > 0) {
+ // We read some data. Enqueue it onto the stream.
+ controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
+ } else {
+ // We have reached the end of the body, so we close the stream.
+ controller.close();
+ }
+ } catch (err) {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ controller.close();
+ }
+ },
+ });
+ }
+
+ window.__bootstrap.flash = {
+ serve,
+ serveTls,
+ };
+})(this);