summaryrefslogtreecommitdiff
path: root/ext/http/01_http.js
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2021-10-04 18:50:40 -0700
committerBert Belder <bertbelder@gmail.com>2021-11-10 14:51:43 -0800
commit72a6231a614e71a57c4f8ce5f9de68ab97171dd1 (patch)
treeb19bbd749ad67f606ef331fee00bfe2b34477633 /ext/http/01_http.js
parent0cc8a9741a16efe3e37167731238b33d26887fd0 (diff)
refactor(ext/http): rewrite hyper integration and fix bug (#12732)
Fixes: #12193 Fixes: #12251 Closes: #12714
Diffstat (limited to 'ext/http/01_http.js')
-rw-r--r--ext/http/01_http.js272
1 files changed, 124 insertions, 148 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 9f05809f5..94f1a1051 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -27,6 +27,7 @@
Set,
SetPrototypeAdd,
SetPrototypeDelete,
+ SetPrototypeHas,
SetPrototypeValues,
StringPrototypeIncludes,
StringPrototypeToLowerCase,
@@ -42,6 +43,8 @@
class HttpConn {
#rid = 0;
+ #closed = false;
+
// This set holds resource ids of resources
// that were created during lifecycle of this request.
// When the connection is closed these resources should be closed
@@ -62,10 +65,11 @@
let nextRequest;
try {
nextRequest = await core.opAsync(
- "op_http_request_next",
+ "op_http_accept",
this.#rid,
);
} catch (error) {
+ this.close();
// A connection error seen here would cause disrupted responses to throw
// a generic `BadResource` error. Instead store this error and replace
// those with it.
@@ -79,26 +83,31 @@
}
throw error;
}
- if (nextRequest === null) return null;
+ if (nextRequest == null) {
+ // Work-around for servers (deno_std/http in particular) that call
+ // `nextRequest()` before upgrading a previous request which has a
+ // `connection: upgrade` header.
+ await null;
+
+ this.close();
+ return null;
+ }
const [
- requestRid,
- responseSenderRid,
+ streamRid,
method,
headersList,
url,
] = nextRequest;
+ SetPrototypeAdd(this.managedResources, streamRid);
/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
- if (typeof requestRid === "number") {
- SetPrototypeAdd(this.managedResources, requestRid);
- // There might be a body, but we don't expose it for GET/HEAD requests.
- // It will be closed automatically once the request has been handled and
- // the response has been sent.
- if (method !== "GET" && method !== "HEAD") {
- body = createRequestBodyStream(this, requestRid);
- }
+ // There might be a body, but we don't expose it for GET/HEAD requests.
+ // It will be closed automatically once the request has been handled and
+ // the response has been sent.
+ if (method !== "GET" && method !== "HEAD") {
+ body = createRequestBodyStream(streamRid);
}
const innerRequest = newInnerRequest(
@@ -111,22 +120,21 @@
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");
- SetPrototypeAdd(this.managedResources, responseSenderRid);
- const respondWith = createRespondWith(
- this,
- responseSenderRid,
- requestRid,
- );
+ const respondWith = createRespondWith(this, streamRid);
return { request, respondWith };
}
/** @returns {void} */
close() {
- for (const rid of SetPrototypeValues(this.managedResources)) {
- core.tryClose(rid);
+ if (!this.#closed) {
+ this.#closed = true;
+ core.close(this.#rid);
+ for (const rid of SetPrototypeValues(this.managedResources)) {
+ SetPrototypeDelete(this.managedResources, rid);
+ core.close(rid);
+ }
}
- core.close(this.#rid);
}
[SymbolAsyncIterator]() {
@@ -136,97 +144,86 @@
async next() {
const reqEvt = await httpConn.nextRequest();
// Change with caution, current form avoids a v8 deopt
- return { value: reqEvt, done: reqEvt === null };
+ return { value: reqEvt ?? undefined, done: reqEvt === null };
},
};
}
}
- function readRequest(requestRid, zeroCopyBuf) {
- return core.opAsync(
- "op_http_request_read",
- requestRid,
- zeroCopyBuf,
- );
+ function readRequest(streamRid, buf) {
+ return core.opAsync("op_http_read", streamRid, buf);
}
- function createRespondWith(httpConn, responseSenderRid, requestRid) {
+ function createRespondWith(httpConn, streamRid) {
return async function respondWith(resp) {
- if (resp instanceof Promise) {
- resp = await resp;
- }
+ try {
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
- if (!(resp instanceof Response)) {
- throw new TypeError(
- "First argument to respondWith must be a Response or a promise resolving to a Response.",
- );
- }
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
- const innerResp = toInnerResponse(resp);
-
- // If response body length is known, it will be sent synchronously in a
- // single op, in other case a "response body" resource will be created and
- // we'll be streaming it.
- /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
- let respBody = null;
- if (innerResp.body !== null) {
- if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
- if (innerResp.body.streamOrStatic instanceof ReadableStream) {
- if (
- innerResp.body.length === null ||
- innerResp.body.source instanceof Blob
- ) {
- respBody = innerResp.body.stream;
- } else {
- const reader = innerResp.body.stream.getReader();
- const r1 = await reader.read();
- if (r1.done) {
- respBody = new Uint8Array(0);
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ if (innerResp.body !== null) {
+ if (innerResp.body.unusable()) {
+ throw new TypeError("Body is unusable.");
+ }
+ if (innerResp.body.streamOrStatic instanceof ReadableStream) {
+ if (
+ innerResp.body.length === null ||
+ innerResp.body.source instanceof Blob
+ ) {
+ respBody = innerResp.body.stream;
} else {
- respBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
}
+ } else {
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
}
} else {
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
+ respBody = new Uint8Array(0);
}
- } else {
- respBody = new Uint8Array(0);
- }
+ const isStreamingResponseBody =
+ !(typeof respBody === "string" || respBody instanceof Uint8Array);
- SetPrototypeDelete(httpConn.managedResources, responseSenderRid);
- let responseBodyRid;
- try {
- responseBodyRid = await core.opAsync(
- "op_http_response",
- [
- responseSenderRid,
+ try {
+ await core.opAsync("op_http_write_headers", [
+ streamRid,
innerResp.status ?? 200,
innerResp.headerList,
- ],
- (respBody instanceof Uint8Array || typeof respBody === "string")
- ? respBody
- : null,
- );
- } catch (error) {
- const connError = httpConn[connErrorSymbol];
- if (error instanceof BadResource && connError != null) {
- // deno-lint-ignore no-ex-assign
- error = new connError.constructor(connError.message);
- }
- if (respBody !== null && respBody instanceof ReadableStream) {
- await respBody.cancel(error);
+ ], isStreamingResponseBody ? null : respBody);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ if (respBody !== null && respBody instanceof ReadableStream) {
+ await respBody.cancel(error);
+ }
+ throw error;
}
- throw error;
- }
- // If `respond` returns a responseBodyRid, we should stream the body
- // to that resource.
- if (responseBodyRid !== null) {
- SetPrototypeAdd(httpConn.managedResources, responseBodyRid);
- try {
+ if (isStreamingResponseBody) {
if (respBody === null || !(respBody instanceof ReadableStream)) {
throw new TypeError("Unreachable");
}
@@ -239,11 +236,7 @@
break;
}
try {
- await core.opAsync(
- "op_http_response_write",
- responseBodyRid,
- value,
- );
+ await core.opAsync("op_http_write", streamRid, value);
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (error instanceof BadResource && connError != null) {
@@ -254,61 +247,55 @@
throw error;
}
}
- } finally {
- // Once all chunks are sent, and the request body is closed, we can
- // close the response body.
- SetPrototypeDelete(httpConn.managedResources, responseBodyRid);
try {
- await core.opAsync("op_http_response_close", responseBodyRid);
- } catch { /* pass */ }
+ await core.opAsync("op_http_shutdown", streamRid);
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
}
- }
- const ws = resp[_ws];
- if (ws) {
- if (typeof requestRid !== "number") {
- throw new TypeError(
- "This request can not be upgraded to a websocket connection.",
+ const ws = resp[_ws];
+ if (ws) {
+ const wsRid = await core.opAsync(
+ "op_http_upgrade_websocket",
+ streamRid,
);
- }
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
- const wsRid = await core.opAsync(
- "op_http_upgrade_websocket",
- requestRid,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+ httpConn.close();
- if (ws[_readyState] === WebSocket.CLOSING) {
- await core.opAsync("op_ws_close", { rid: wsRid });
+ if (ws[_readyState] === WebSocket.CLOSING) {
+ await core.opAsync("op_ws_close", { rid: wsRid });
- ws[_readyState] = WebSocket.CLOSED;
+ ws[_readyState] = WebSocket.CLOSED;
- const errEvent = new ErrorEvent("error");
- ws.dispatchEvent(errEvent);
+ const errEvent = new ErrorEvent("error");
+ ws.dispatchEvent(errEvent);
- const event = new CloseEvent("close");
- ws.dispatchEvent(event);
+ const event = new CloseEvent("close");
+ ws.dispatchEvent(event);
- core.tryClose(wsRid);
- } else {
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
+ core.tryClose(wsRid);
+ } else {
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
- ws[_eventLoop]();
+ ws[_eventLoop]();
+ }
+ }
+ } finally {
+ if (SetPrototypeHas(httpConn.managedResources, streamRid)) {
+ SetPrototypeDelete(httpConn.managedResources, streamRid);
+ core.close(streamRid);
}
- } else if (typeof requestRid === "number") {
- // Try to close "request" resource. It might have been already consumed,
- // but if it hasn't been we need to close it here to avoid resource
- // leak.
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.tryClose(requestRid);
}
};
}
- function createRequestBodyStream(httpConn, requestRid) {
+ function createRequestBodyStream(streamRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
@@ -316,32 +303,21 @@
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await readRequest(
- requestRid,
- chunk,
- );
+ const read = await readRequest(streamRid, chunk);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.close(requestRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.close(requestRid);
}
},
- cancel() {
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.close(requestRid);
- },
});
}