summaryrefslogtreecommitdiff
path: root/extensions/http/01_http.js
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/http/01_http.js')
-rw-r--r--extensions/http/01_http.js374
1 files changed, 374 insertions, 0 deletions
diff --git a/extensions/http/01_http.js b/extensions/http/01_http.js
new file mode 100644
index 000000000..4bcdf1f07
--- /dev/null
+++ b/extensions/http/01_http.js
@@ -0,0 +1,374 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const webidl = window.__bootstrap.webidl;
+ const { InnerBody } = window.__bootstrap.fetchBody;
+ const { setEventTargetData } = window.__bootstrap.eventTarget;
+ const {
+ Response,
+ fromInnerRequest,
+ toInnerResponse,
+ newInnerRequest,
+ newInnerResponse,
+ fromInnerResponse,
+ } = window.__bootstrap.fetch;
+ const core = window.Deno.core;
+ const { BadResource, Interrupted } = core;
+ const { ReadableStream } = window.__bootstrap.streams;
+ const abortSignal = window.__bootstrap.abortSignal;
+ const { WebSocket, _rid, _readyState, _eventLoop, _protocol } =
+ window.__bootstrap.webSocket;
+ const {
+ ArrayPrototypeIncludes,
+ ArrayPrototypePush,
+ Promise,
+ StringPrototypeIncludes,
+ StringPrototypeSplit,
+ Symbol,
+ SymbolAsyncIterator,
+ TypedArrayPrototypeSubarray,
+ TypeError,
+ Uint8Array,
+ } = window.__bootstrap.primordials;
+
+ const connErrorSymbol = Symbol("connError");
+
+ class HttpConn {
+ #rid = 0;
+
+ constructor(rid) {
+ this.#rid = rid;
+ }
+
+ /** @returns {number} */
+ get rid() {
+ return this.#rid;
+ }
+
+ /** @returns {Promise<ResponseEvent | null>} */
+ async nextRequest() {
+ let nextRequest;
+ try {
+ nextRequest = await core.opAsync(
+ "op_http_request_next",
+ this.#rid,
+ );
+ } catch (error) {
+ // A connection error seen here would cause disrupted responses to throw
+ // a generic `BadResource` error. Instead store this error and replace
+ // those with it.
+ this[connErrorSymbol] = error;
+ if (error instanceof BadResource) {
+ return null;
+ } else if (error instanceof Interrupted) {
+ return null;
+ } else if (
+ StringPrototypeIncludes(error.message, "connection closed")
+ ) {
+ return null;
+ }
+ throw error;
+ }
+ if (nextRequest === null) return null;
+
+ const [
+ requestRid,
+ responseSenderRid,
+ method,
+ headersList,
+ url,
+ ] = nextRequest;
+
+ /** @type {ReadableStream<Uint8Array> | undefined} */
+ let body = null;
+ if (typeof requestRid === "number") {
+ body = createRequestBodyStream(requestRid);
+ }
+
+ const innerRequest = newInnerRequest(
+ method,
+ url,
+ headersList,
+ body !== null ? new InnerBody(body) : null,
+ );
+ const signal = abortSignal.newSignal();
+ const request = fromInnerRequest(innerRequest, signal, "immutable");
+
+ const respondWith = createRespondWith(
+ this,
+ responseSenderRid,
+ requestRid,
+ );
+
+ return { request, respondWith };
+ }
+
+ /** @returns {void} */
+ close() {
+ core.close(this.#rid);
+ }
+
+ [SymbolAsyncIterator]() {
+ // deno-lint-ignore no-this-alias
+ const httpConn = this;
+ return {
+ async next() {
+ const reqEvt = await httpConn.nextRequest();
+ // Change with caution, current form avoids a v8 deopt
+ return { value: reqEvt, done: reqEvt === null };
+ },
+ };
+ }
+ }
+
+ function readRequest(requestRid, zeroCopyBuf) {
+ return core.opAsync(
+ "op_http_request_read",
+ requestRid,
+ zeroCopyBuf,
+ );
+ }
+
+ function createRespondWith(httpConn, responseSenderRid, requestRid) {
+ return async function respondWith(resp) {
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
+
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
+
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ if (innerResp.body !== null) {
+ if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
+ if (innerResp.body.streamOrStatic instanceof ReadableStream) {
+ if (
+ innerResp.body.length === null ||
+ innerResp.body.source instanceof Blob
+ ) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
+ }
+ } else {
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ }
+ } else {
+ respBody = new Uint8Array(0);
+ }
+
+ let responseBodyRid;
+ try {
+ responseBodyRid = await core.opAsync("op_http_response", [
+ responseSenderRid,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ ], respBody instanceof Uint8Array ? respBody : null);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ if (respBody !== null && respBody instanceof ReadableStream) {
+ await respBody.cancel(error);
+ }
+ throw error;
+ }
+
+ // If `respond` returns a responseBodyRid, we should stream the body
+ // to that resource.
+ if (responseBodyRid !== null) {
+ try {
+ if (respBody === null || !(respBody instanceof ReadableStream)) {
+ throw new TypeError("Unreachable");
+ }
+ const reader = respBody.getReader();
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (!(value instanceof Uint8Array)) {
+ await reader.cancel(new TypeError("Value not a Uint8Array"));
+ break;
+ }
+ try {
+ await core.opAsync(
+ "op_http_response_write",
+ responseBodyRid,
+ value,
+ );
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ await reader.cancel(error);
+ throw error;
+ }
+ }
+ } finally {
+ // Once all chunks are sent, and the request body is closed, we can
+ // close the response body.
+ try {
+ await core.opAsync("op_http_response_close", responseBodyRid);
+ } catch { /* pass */ }
+ }
+ }
+
+ const ws = resp[_ws];
+ if (ws) {
+ if (typeof requestRid !== "number") {
+ throw new TypeError(
+ "This request can not be upgraded to a websocket connection.",
+ );
+ }
+
+ const wsRid = await core.opAsync(
+ "op_http_upgrade_websocket",
+ requestRid,
+ );
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+
+ if (ws[_readyState] === WebSocket.CLOSING) {
+ await core.opAsync("op_ws_close", { rid: wsRid });
+
+ ws[_readyState] = WebSocket.CLOSED;
+
+ const errEvent = new ErrorEvent("error");
+ ws.dispatchEvent(errEvent);
+
+ const event = new CloseEvent("close");
+ ws.dispatchEvent(event);
+
+ try {
+ core.close(wsRid);
+ } catch (err) {
+ // Ignore error if the socket has already been closed.
+ if (!(err instanceof Deno.errors.BadResource)) throw err;
+ }
+ } else {
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ }
+ }
+ };
+ }
+
+ function createRequestBodyStream(requestRid) {
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ try {
+ // This is the largest possible size for a single packet on a TLS
+ // stream.
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const read = await readRequest(
+ requestRid,
+ chunk,
+ );
+ if (read > 0) {
+ // We read some data. Enqueue it onto the stream.
+ controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
+ } else {
+ // We have reached the end of the body, so we close the stream.
+ controller.close();
+ core.close(requestRid);
+ }
+ } catch (err) {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ controller.close();
+ core.close(requestRid);
+ }
+ },
+ cancel() {
+ core.close(requestRid);
+ },
+ });
+ }
+
+ const _ws = Symbol("[[associated_ws]]");
+
+ function upgradeWebSocket(request, options = {}) {
+ if (request.headers.get("upgrade") !== "websocket") {
+ throw new TypeError(
+ "Invalid Header: 'upgrade' header must be 'websocket'",
+ );
+ }
+
+ if (request.headers.get("connection") !== "Upgrade") {
+ throw new TypeError(
+ "Invalid Header: 'connection' header must be 'Upgrade'",
+ );
+ }
+
+ const websocketKey = request.headers.get("sec-websocket-key");
+ if (websocketKey === null) {
+ throw new TypeError(
+ "Invalid Header: 'sec-websocket-key' header must be set",
+ );
+ }
+
+ const accept = core.opSync("op_http_websocket_accept_header", websocketKey);
+
+ const r = newInnerResponse(101);
+ r.headerList = [
+ ["upgrade", "websocket"],
+ ["connection", "Upgrade"],
+ ["sec-websocket-accept", accept],
+ ];
+
+ const protocolsStr = request.headers.get("sec-websocket-protocol") || "";
+ const protocols = StringPrototypeSplit(protocolsStr, ", ");
+ if (protocols && options.protocol) {
+ if (ArrayPrototypeIncludes(protocols, options.protocol)) {
+ ArrayPrototypePush(r.headerList, [
+ "sec-websocket-protocol",
+ options.protocol,
+ ]);
+ } else {
+ throw new TypeError(
+ `Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`,
+ );
+ }
+ }
+
+ const response = fromInnerResponse(r, "immutable");
+
+ const websocket = webidl.createBranded(WebSocket);
+ setEventTargetData(websocket);
+ response[_ws] = websocket;
+
+ return { response, websocket };
+ }
+
+ window.__bootstrap.http = {
+ HttpConn,
+ upgradeWebSocket,
+ };
+})(this);