summaryrefslogtreecommitdiff
path: root/extensions/net/03_http.js
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/net/03_http.js')
-rw-r--r--extensions/net/03_http.js251
1 files changed, 251 insertions, 0 deletions
diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js
new file mode 100644
index 000000000..d5054bd1a
--- /dev/null
+++ b/extensions/net/03_http.js
@@ -0,0 +1,251 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const { InnerBody } = window.__bootstrap.fetchBody;
+ const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } =
+ window.__bootstrap.fetch;
+ const core = window.Deno.core;
+ const { BadResource, Interrupted } = core;
+ const { ReadableStream } = window.__bootstrap.streams;
+ const abortSignal = window.__bootstrap.abortSignal;
+
+ function serveHttp(conn) {
+ const rid = Deno.core.opSync("op_http_start", conn.rid);
+ return new HttpConn(rid);
+ }
+
+ const connErrorSymbol = Symbol("connError");
+
+ class HttpConn {
+ #rid = 0;
+
+ constructor(rid) {
+ this.#rid = rid;
+ }
+
+ /** @returns {number} */
+ get rid() {
+ return this.#rid;
+ }
+
+ /** @returns {Promise<ResponseEvent | null>} */
+ async nextRequest() {
+ let nextRequest;
+ try {
+ nextRequest = await Deno.core.opAsync(
+ "op_http_request_next",
+ this.#rid,
+ );
+ } catch (error) {
+ // A connection error seen here would cause disrupted responses to throw
+ // a generic `BadResource` error. Instead store this error and replace
+ // those with it.
+ this[connErrorSymbol] = error;
+ if (error instanceof BadResource) {
+ return null;
+ } else if (error instanceof Interrupted) {
+ return null;
+ } else if (error.message.includes("connection closed")) {
+ return null;
+ }
+ throw error;
+ }
+ if (nextRequest === null) return null;
+
+ const [
+ requestBodyRid,
+ responseSenderRid,
+ method,
+ headersList,
+ url,
+ ] = nextRequest;
+
+ /** @type {ReadableStream<Uint8Array> | undefined} */
+ let body = null;
+ if (typeof requestBodyRid === "number") {
+ body = createRequestBodyStream(requestBodyRid);
+ }
+
+ const innerRequest = newInnerRequest(
+ method,
+ url,
+ headersList,
+ body !== null ? new InnerBody(body) : null,
+ );
+ const signal = abortSignal.newSignal();
+ const request = fromInnerRequest(innerRequest, signal, "immutable");
+
+ const respondWith = createRespondWith(this, responseSenderRid);
+
+ return { request, respondWith };
+ }
+
+ /** @returns {void} */
+ close() {
+ core.close(this.#rid);
+ }
+
+ [Symbol.asyncIterator]() {
+ // deno-lint-ignore no-this-alias
+ const httpConn = this;
+ return {
+ async next() {
+ const reqEvt = await httpConn.nextRequest();
+ // Change with caution, current form avoids a v8 deopt
+ return { value: reqEvt, done: reqEvt === null };
+ },
+ };
+ }
+ }
+
+ function readRequest(requestRid, zeroCopyBuf) {
+ return Deno.core.opAsync(
+ "op_http_request_read",
+ requestRid,
+ zeroCopyBuf,
+ );
+ }
+
+ function createRespondWith(httpConn, responseSenderRid) {
+ return async function respondWith(resp) {
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
+
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
+
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ if (innerResp.body !== null) {
+ if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
+ if (innerResp.body.streamOrStatic instanceof ReadableStream) {
+ if (innerResp.body.length === null) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
+ }
+ } else {
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ }
+ } else {
+ respBody = new Uint8Array(0);
+ }
+
+ let responseBodyRid;
+ try {
+ responseBodyRid = await Deno.core.opAsync("op_http_response", [
+ responseSenderRid,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ ], respBody instanceof Uint8Array ? respBody : null);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ if (respBody !== null && respBody instanceof ReadableStream) {
+ await respBody.cancel(error);
+ }
+ throw error;
+ }
+
+ // If `respond` returns a responseBodyRid, we should stream the body
+ // to that resource.
+ if (responseBodyRid !== null) {
+ try {
+ if (respBody === null || !(respBody instanceof ReadableStream)) {
+ throw new TypeError("Unreachable");
+ }
+ const reader = respBody.getReader();
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (!(value instanceof Uint8Array)) {
+ await reader.cancel(new TypeError("Value not a Uint8Array"));
+ break;
+ }
+ try {
+ await Deno.core.opAsync(
+ "op_http_response_write",
+ responseBodyRid,
+ value,
+ );
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ await reader.cancel(error);
+ throw error;
+ }
+ }
+ } finally {
+ // Once all chunks are sent, and the request body is closed, we can
+ // close the response body.
+ try {
+ await Deno.core.opAsync("op_http_response_close", responseBodyRid);
+ } catch { /* pass */ }
+ }
+ }
+ };
+ }
+
+ function createRequestBodyStream(requestBodyRid) {
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ try {
+ // This is the largest possible size for a single packet on a TLS
+ // stream.
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const read = await readRequest(
+ requestBodyRid,
+ chunk,
+ );
+ if (read > 0) {
+ // We read some data. Enqueue it onto the stream.
+ controller.enqueue(chunk.subarray(0, read));
+ } else {
+ // We have reached the end of the body, so we close the stream.
+ controller.close();
+ core.close(requestBodyRid);
+ }
+ } catch (err) {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ controller.close();
+ core.close(requestBodyRid);
+ }
+ },
+ cancel() {
+ core.close(requestBodyRid);
+ },
+ });
+ }
+
+ window.__bootstrap.http = {
+ serveHttp,
+ };
+})(this);