diff options
Diffstat (limited to 'ext/flash/01_http.js')
-rw-r--r-- | ext/flash/01_http.js | 793 |
1 files changed, 0 insertions, 793 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js deleted file mode 100644 index fe503ed05..000000000 --- a/ext/flash/01_http.js +++ /dev/null @@ -1,793 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -const core = globalThis.Deno.core; -const ops = core.ops; -const primordials = globalThis.__bootstrap.primordials; -import { BlobPrototype } from "ext:deno_web/09_file.js"; -import { TcpConn } from "ext:deno_net/01_net.js"; -import { toInnerResponse } from "ext:deno_fetch/23_response.js"; -import { _flash, fromFlashRequest } from "ext:deno_fetch/23_request.js"; -import { Event } from "ext:deno_web/02_event.js"; -import { - _state, - getReadableStreamResourceBacking, - ReadableStream, - readableStreamClose, - ReadableStreamPrototype, -} from "ext:deno_web/06_streams.js"; -import { - _eventLoop, - _idleTimeoutDuration, - _idleTimeoutTimeout, - _protocol, - _readyState, - _rid, - _serverHandleIdleTimeout, - WebSocket, -} from "ext:deno_websocket/01_websocket.js"; -import { _ws } from "ext:deno_http/01_http.js"; -const { - ObjectPrototypeIsPrototypeOf, - PromisePrototype, - PromisePrototypeCatch, - PromisePrototypeThen, - SafePromiseAll, - TypedArrayPrototypeGetByteLength, - TypedArrayPrototypeGetSymbolToStringTag, - TypedArrayPrototypeSet, - TypedArrayPrototypeSubarray, - TypeError, - Uint8Array, -} = primordials; - -const statusCodes = { - 100: "Continue", - 101: "Switching Protocols", - 102: "Processing", - 200: "OK", - 201: "Created", - 202: "Accepted", - 203: "Non Authoritative Information", - 204: "No Content", - 205: "Reset Content", - 206: "Partial Content", - 207: "Multi-Status", - 208: "Already Reported", - 226: "IM Used", - 300: "Multiple Choices", - 301: "Moved Permanently", - 302: "Found", - 303: "See Other", - 304: "Not Modified", - 305: "Use Proxy", - 307: "Temporary Redirect", - 308: "Permanent Redirect", - 400: "Bad Request", - 401: "Unauthorized", - 402: "Payment Required", - 403: "Forbidden", - 404: "Not Found", - 405: "Method Not Allowed", - 406: "Not Acceptable", - 407: "Proxy Authentication Required", - 408: "Request Timeout", - 409: "Conflict", - 410: "Gone", - 411: "Length Required", - 412: "Precondition Failed", - 413: "Payload Too Large", - 414: "URI Too Long", - 415: "Unsupported Media Type", - 416: "Range Not Satisfiable", - 418: "I'm a teapot", - 421: "Misdirected Request", - 422: "Unprocessable Entity", - 423: "Locked", - 424: "Failed Dependency", - 426: "Upgrade Required", - 428: "Precondition Required", - 429: "Too Many Requests", - 431: "Request Header Fields Too Large", - 451: "Unavailable For Legal Reasons", - 500: "Internal Server Error", - 501: "Not Implemented", - 502: "Bad Gateway", - 503: "Service Unavailable", - 504: "Gateway Timeout", - 505: "HTTP Version Not Supported", - 506: "Variant Also Negotiates", - 507: "Insufficient Storage", - 508: "Loop Detected", - 510: "Not Extended", - 511: "Network Authentication Required", -}; - -const methods = { - 0: "GET", - 1: "HEAD", - 2: "CONNECT", - 3: "PUT", - 4: "DELETE", - 5: "OPTIONS", - 6: "TRACE", - 7: "POST", - 8: "PATCH", -}; - -let dateInterval; -let date; - -/** - * Construct an HTTP response message. - * All HTTP/1.1 messages consist of a start-line followed by a sequence - * of octets. - * - * HTTP-message = start-line - * *( header-field CRLF ) - * CRLF - * [ message-body ] - * - * @param {keyof typeof methods} method - * @param {keyof typeof statusCodes} status - * @param {[name: string, value: string][]} headerList - * @param {Uint8Array | string | null} body - * @param {number} bodyLen - * @param {boolean} earlyEnd - * @returns {Uint8Array | string} - */ -function http1Response( - method, - status, - headerList, - body, - bodyLen, - earlyEnd = false, -) { - // HTTP uses a "<major>.<minor>" numbering scheme - // HTTP-version = HTTP-name "/" DIGIT "." DIGIT - // HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive - // - // status-line = HTTP-version SP status-code SP reason-phrase CRLF - // Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2 - let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`; - for (let i = 0; i < headerList.length; ++i) { - const { 0: name, 1: value } = headerList[i]; - // header-field = field-name ":" OWS field-value OWS - str += `${name}: ${value}\r\n`; - } - - // https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6 - if (status === 205 || status === 304) { - // MUST NOT generate a payload in a 205 response. - // indicate a zero-length body for the response by - // including a Content-Length header field with a value of 0. - str += "Content-Length: 0\r\n\r\n"; - return str; - } - - // MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204. - if (status === 204 || status < 200) { - str += "\r\n"; - return str; - } - - if (earlyEnd === true) { - return str; - } - - // null body status is validated by inititalizeAResponse in ext/fetch - if (body !== null && body !== undefined) { - str += `Content-Length: ${bodyLen}\r\n\r\n`; - } else { - str += "Transfer-Encoding: chunked\r\n\r\n"; - return str; - } - - // A HEAD request. - if (method === 1) return str; - - if (typeof body === "string") { - str += body ?? ""; - } else { - const head = core.encode(str); - const response = new Uint8Array( - TypedArrayPrototypeGetByteLength(head) + bodyLen, - ); - TypedArrayPrototypeSet(response, head, 0); - TypedArrayPrototypeSet( - response, - body, - TypedArrayPrototypeGetByteLength(head), - ); - return response; - } - - return str; -} - -function prepareFastCalls() { - return ops.op_flash_make_request(); -} - -function hostnameForDisplay(hostname) { - // If the hostname is "0.0.0.0", we display "localhost" in console - // because browsers in Windows don't resolve "0.0.0.0". - // See the discussion in https://github.com/denoland/deno_std/issues/1165 - return hostname === "0.0.0.0" ? "localhost" : hostname; -} - -function writeFixedResponse( - server, - requestId, - response, - responseLen, - end, - respondFast, -) { - let nwritten = 0; - // TypedArray - if (typeof response !== "string") { - nwritten = respondFast(requestId, response, end); - } else { - // string - nwritten = ops.op_flash_respond( - server, - requestId, - response, - end, - ); - } - - if (nwritten < responseLen) { - core.opAsync( - "op_flash_respond_async", - server, - requestId, - response.slice(nwritten), - end, - ); - } -} - -// TODO(@littledivy): Woah woah, cut down the number of arguments. -async function handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, -) { - // there might've been an HTTP upgrade. - if (resp === undefined) { - return; - } - const innerResp = toInnerResponse(resp); - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream<Uint8Array> | Uint8Array | string | null} */ - let respBody = null; - let isStreamingResponseBody = false; - if (innerResp.body !== null) { - if (typeof innerResp.body.streamOrStatic?.body === "string") { - if (innerResp.body.streamOrStatic.consumed === true) { - throw new TypeError("Body is unusable."); - } - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - isStreamingResponseBody = false; - } else if ( - ObjectPrototypeIsPrototypeOf( - ReadableStreamPrototype, - innerResp.body.streamOrStatic, - ) - ) { - if (innerResp.body.unusable()) { - throw new TypeError("Body is unusable."); - } - if ( - innerResp.body.length === null || - ObjectPrototypeIsPrototypeOf( - BlobPrototype, - innerResp.body.source, - ) - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); - } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - } - isStreamingResponseBody = !( - typeof respBody === "string" || - TypedArrayPrototypeGetSymbolToStringTag(respBody) === "Uint8Array" - ); - } else { - if (innerResp.body.streamOrStatic.consumed === true) { - throw new TypeError("Body is unusable."); - } - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - } - } else { - respBody = new Uint8Array(0); - } - - const ws = resp[_ws]; - if (isStreamingResponseBody === false) { - const length = typeof respBody === "string" - ? core.byteLength(respBody) - : TypedArrayPrototypeGetByteLength(respBody); - const responseStr = http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - respBody, - length, - ); - // A HEAD request always ignores body, but includes the correct content-length size. - const responseLen = method === 1 ? core.byteLength(responseStr) : length; - writeFixedResponse( - serverId, - i, - responseStr, - responseLen, - !ws, // Don't close socket if there is a deferred websocket upgrade. - respondFast, - ); - } - - return (async () => { - if (!ws) { - if (hasBody && body[_state] !== "closed") { - // TODO(@littledivy): Optimize by draining in a single op. - try { - await req.arrayBuffer(); - } catch { /* pass */ } - } - } - - if (isStreamingResponseBody === true) { - const resourceBacking = getReadableStreamResourceBacking(respBody); - if (resourceBacking) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); - } - const reader = respBody.getReader(); // Aquire JS lock. - try { - PromisePrototypeThen( - core.opAsync( - "op_flash_write_resource", - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - 0, // Content-Length will be set by the op. - true, - ), - serverId, - i, - resourceBacking.rid, - resourceBacking.autoClose, - ), - () => { - // Release JS lock. - readableStreamClose(respBody); - }, - ); - } catch (error) { - await reader.cancel(error); - throw error; - } - } else { - const reader = respBody.getReader(); - - // Best case: sends headers + first chunk in a single go. - const { value, done } = await reader.read(); - writeFixedResponse( - serverId, - i, - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - // deno-lint-ignore prefer-primordials - respBody.byteLength, - ), - // deno-lint-ignore prefer-primordials - respBody.byteLength, - false, - respondFast, - ); - - await tryRespondChunked( - i, - value, - done, - ); - - if (!done) { - while (true) { - const chunk = await reader.read(); - await respondChunked( - i, - chunk.value, - chunk.done, - ); - if (chunk.done) break; - } - } - } - } - - if (ws) { - const wsRid = await core.opAsync( - "op_flash_upgrade_websocket", - serverId, - i, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - if (ws[_idleTimeoutDuration]) { - ws.addEventListener( - "close", - () => clearTimeout(ws[_idleTimeoutTimeout]), - ); - } - ws[_serverHandleIdleTimeout](); - } - })(); -} - -function createServe(opFn) { - return async function serve(arg1, arg2) { - let options = undefined; - let handler = undefined; - if (typeof arg1 === "function") { - handler = arg1; - options = arg2; - } else if (typeof arg2 === "function") { - handler = arg2; - options = arg1; - } else { - options = arg1; - } - if (handler === undefined) { - if (options === undefined) { - throw new TypeError( - "No handler was provided, so an options bag is mandatory.", - ); - } - handler = options.handler; - } - if (typeof handler !== "function") { - throw new TypeError("A handler function must be provided."); - } - if (options === undefined) { - options = {}; - } - - const signal = options.signal; - - const onError = options.onError ?? function (error) { - console.error(error); - return new Response("Internal Server Error", { status: 500 }); - }; - - const onListen = options.onListen ?? function ({ port }) { - console.log( - `Listening on http://${ - hostnameForDisplay(listenOpts.hostname) - }:${port}/`, - ); - }; - - const listenOpts = { - hostname: options.hostname ?? "127.0.0.1", - port: options.port ?? 9000, - reuseport: options.reusePort ?? false, - }; - if (options.cert || options.key) { - if (!options.cert || !options.key) { - throw new TypeError( - "Both cert and key must be provided to enable HTTPS.", - ); - } - listenOpts.cert = options.cert; - listenOpts.key = options.key; - } - - const serverId = opFn(listenOpts); - const serverPromise = core.opAsync("op_flash_drive_server", serverId); - - PromisePrototypeCatch( - PromisePrototypeThen( - core.opAsync("op_flash_wait_for_listening", serverId), - (port) => { - onListen({ hostname: listenOpts.hostname, port }); - }, - ), - () => {}, - ); - const finishedPromise = PromisePrototypeCatch(serverPromise, () => {}); - - const server = { - id: serverId, - transport: listenOpts.cert && listenOpts.key ? "https" : "http", - hostname: listenOpts.hostname, - port: listenOpts.port, - closed: false, - finished: finishedPromise, - async close() { - if (server.closed) { - return; - } - server.closed = true; - await core.opAsync("op_flash_close_server", serverId); - await server.finished; - }, - async serve() { - let offset = 0; - while (true) { - if (server.closed) { - break; - } - - let tokens = nextRequestSync(); - if (tokens === 0) { - tokens = await core.opAsync("op_flash_next_async", serverId); - if (server.closed) { - break; - } - } - - for (let i = offset; i < offset + tokens; i++) { - let body = null; - // There might be a body, but we don't expose it for GET/HEAD requests. - // It will be closed automatically once the request has been handled and - // the response has been sent. - const method = getMethodSync(i); - let hasBody = method > 2; // Not GET/HEAD/CONNECT - if (hasBody) { - body = createRequestBodyStream(serverId, i); - if (body === null) { - hasBody = false; - } - } - - const req = fromFlashRequest( - serverId, - /* streamRid */ - i, - body, - /* methodCb */ - () => methods[method], - /* urlCb */ - () => { - const path = ops.op_flash_path(serverId, i); - return `${server.transport}://${server.hostname}:${server.port}${path}`; - }, - /* headersCb */ - () => ops.op_flash_headers(serverId, i), - ); - - let resp; - let remoteAddr; - try { - resp = handler(req, { - get remoteAddr() { - if (!remoteAddr) { - const { 0: hostname, 1: port } = core.ops.op_flash_addr( - serverId, - i, - ); - remoteAddr = { hostname, port }; - } - return remoteAddr; - }, - }); - if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) { - PromisePrototypeCatch( - PromisePrototypeThen( - resp, - (resp) => - handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, - ), - ), - onError, - ); - } else if (typeof resp?.then === "function") { - resp.then((resp) => - handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, - ) - ).catch(onError); - } else { - handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, - ).catch(onError); - } - } catch (e) { - resp = await onError(e); - } - } - - offset += tokens; - } - await server.finished; - }, - }; - - signal?.addEventListener("abort", () => { - clearInterval(dateInterval); - PromisePrototypeThen(server.close(), () => {}, () => {}); - }, { - once: true, - }); - - function tryRespondChunked(token, chunk, shutdown) { - const nwritten = ops.op_try_flash_respond_chunked( - serverId, - token, - chunk ?? new Uint8Array(), - shutdown, - ); - if (nwritten > 0) { - return core.opAsync( - "op_flash_respond_chunked", - serverId, - token, - chunk, - shutdown, - nwritten, - ); - } - } - - function respondChunked(token, chunk, shutdown) { - return core.opAsync( - "op_flash_respond_chunked", - serverId, - token, - chunk, - shutdown, - ); - } - - const fastOp = prepareFastCalls(); - let nextRequestSync = () => fastOp.nextRequest(); - let getMethodSync = (token) => fastOp.getMethod(token); - let respondFast = (token, response, shutdown) => - fastOp.respond(token, response, shutdown); - if (serverId > 0) { - nextRequestSync = () => ops.op_flash_next_server(serverId); - getMethodSync = (token) => ops.op_flash_method(serverId, token); - respondFast = (token, response, shutdown) => - ops.op_flash_respond(serverId, token, response, null, shutdown); - } - - if (!dateInterval) { - date = new Date().toUTCString(); - dateInterval = setInterval(() => { - date = new Date().toUTCString(); - }, 1000); - } - - await SafePromiseAll([ - PromisePrototypeCatch(server.serve(), console.error), - serverPromise, - ]); - }; -} - -function createRequestBodyStream(serverId, token) { - // The first packet is left over bytes after parsing the request - const firstRead = ops.op_flash_first_packet( - serverId, - token, - ); - if (!firstRead) return null; - let firstEnqueued = TypedArrayPrototypeGetByteLength(firstRead) === 0; - - return new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - if (firstEnqueued === false) { - controller.enqueue(firstRead); - firstEnqueued = true; - return; - } - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - const read = await core.opAsync( - "op_flash_read_body", - serverId, - token, - chunk, - ); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - // We have reached the end of the body, so we close the stream. - controller.close(); - } - } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - } - }, - }); -} - -function upgradeHttpRaw(req) { - if (!req[_flash]) { - throw new TypeError( - "Non-flash requests can not be upgraded with `upgradeHttpRaw`. Use `upgradeHttp` instead.", - ); - } - - // NOTE(bartlomieju): - // Access these fields so they are cached on `req` object, otherwise - // they wouldn't be available after the connection gets upgraded. - req.url; - req.method; - req.headers; - - const { serverId, streamRid } = req[_flash]; - const connRid = ops.op_flash_upgrade_http(streamRid, serverId); - // TODO(@littledivy): return already read first packet too. - return [new TcpConn(connRid), new Uint8Array()]; -} - -export { createServe, upgradeHttpRaw }; |