summaryrefslogtreecommitdiff
path: root/ext/http/01_http.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/01_http.js')
-rw-r--r--ext/http/01_http.js267
1 files changed, 148 insertions, 119 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index d06be2142..9f05809f5 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -27,7 +27,6 @@
Set,
SetPrototypeAdd,
SetPrototypeDelete,
- SetPrototypeHas,
SetPrototypeValues,
StringPrototypeIncludes,
StringPrototypeToLowerCase,
@@ -43,8 +42,6 @@
class HttpConn {
#rid = 0;
- #closed = false;
-
// This set holds resource ids of resources
// that were created during lifecycle of this request.
// When the connection is closed these resources should be closed
@@ -65,11 +62,10 @@
let nextRequest;
try {
nextRequest = await core.opAsync(
- "op_http_accept",
+ "op_http_request_next",
this.#rid,
);
} catch (error) {
- this.close();
// A connection error seen here would cause disrupted responses to throw
// a generic `BadResource` error. Instead store this error and replace
// those with it.
@@ -83,26 +79,26 @@
}
throw error;
}
- if (nextRequest == null) {
- this.close();
- return null;
- }
+ if (nextRequest === null) return null;
const [
- streamRid,
+ requestRid,
+ responseSenderRid,
method,
headersList,
url,
] = nextRequest;
- SetPrototypeAdd(this.managedResources, streamRid);
/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
- // There might be a body, but we don't expose it for GET/HEAD requests.
- // It will be closed automatically once the request has been handled and
- // the response has been sent.
- if (method !== "GET" && method !== "HEAD") {
- body = createRequestBodyStream(streamRid);
+ if (typeof requestRid === "number") {
+ SetPrototypeAdd(this.managedResources, requestRid);
+ // There might be a body, but we don't expose it for GET/HEAD requests.
+ // It will be closed automatically once the request has been handled and
+ // the response has been sent.
+ if (method !== "GET" && method !== "HEAD") {
+ body = createRequestBodyStream(this, requestRid);
+ }
}
const innerRequest = newInnerRequest(
@@ -115,21 +111,22 @@
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");
- const respondWith = createRespondWith(this, streamRid);
+ SetPrototypeAdd(this.managedResources, responseSenderRid);
+ const respondWith = createRespondWith(
+ this,
+ responseSenderRid,
+ requestRid,
+ );
return { request, respondWith };
}
/** @returns {void} */
close() {
- if (!this.#closed) {
- this.#closed = true;
- core.close(this.#rid);
- for (const rid of SetPrototypeValues(this.managedResources)) {
- SetPrototypeDelete(this.managedResources, rid);
- core.close(rid);
- }
+ for (const rid of SetPrototypeValues(this.managedResources)) {
+ core.tryClose(rid);
}
+ core.close(this.#rid);
}
[SymbolAsyncIterator]() {
@@ -139,86 +136,97 @@
async next() {
const reqEvt = await httpConn.nextRequest();
// Change with caution, current form avoids a v8 deopt
- return { value: reqEvt ?? undefined, done: reqEvt === null };
+ return { value: reqEvt, done: reqEvt === null };
},
};
}
}
- function readRequest(streamRid, buf) {
- return core.opAsync("op_http_read", streamRid, buf);
+ function readRequest(requestRid, zeroCopyBuf) {
+ return core.opAsync(
+ "op_http_request_read",
+ requestRid,
+ zeroCopyBuf,
+ );
}
- function createRespondWith(httpConn, streamRid) {
+ function createRespondWith(httpConn, responseSenderRid, requestRid) {
return async function respondWith(resp) {
- try {
- if (resp instanceof Promise) {
- resp = await resp;
- }
-
- if (!(resp instanceof Response)) {
- throw new TypeError(
- "First argument to respondWith must be a Response or a promise resolving to a Response.",
- );
- }
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
- const innerResp = toInnerResponse(resp);
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
- // If response body length is known, it will be sent synchronously in a
- // single op, in other case a "response body" resource will be created and
- // we'll be streaming it.
- /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
- let respBody = null;
- if (innerResp.body !== null) {
- if (innerResp.body.unusable()) {
- throw new TypeError("Body is unusable.");
- }
- if (innerResp.body.streamOrStatic instanceof ReadableStream) {
- if (
- innerResp.body.length === null ||
- innerResp.body.source instanceof Blob
- ) {
- respBody = innerResp.body.stream;
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ if (innerResp.body !== null) {
+ if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
+ if (innerResp.body.streamOrStatic instanceof ReadableStream) {
+ if (
+ innerResp.body.length === null ||
+ innerResp.body.source instanceof Blob
+ ) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
} else {
- const reader = innerResp.body.stream.getReader();
- const r1 = await reader.read();
- if (r1.done) {
- respBody = new Uint8Array(0);
- } else {
- respBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
- }
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
}
- } else {
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
}
} else {
- respBody = new Uint8Array(0);
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
}
- const isStreamingResponseBody =
- !(typeof respBody === "string" || respBody instanceof Uint8Array);
+ } else {
+ respBody = new Uint8Array(0);
+ }
- try {
- await core.opAsync("op_http_write_headers", [
- streamRid,
+ SetPrototypeDelete(httpConn.managedResources, responseSenderRid);
+ let responseBodyRid;
+ try {
+ responseBodyRid = await core.opAsync(
+ "op_http_response",
+ [
+ responseSenderRid,
innerResp.status ?? 200,
innerResp.headerList,
- ], isStreamingResponseBody ? null : respBody);
- } catch (error) {
- const connError = httpConn[connErrorSymbol];
- if (error instanceof BadResource && connError != null) {
- // deno-lint-ignore no-ex-assign
- error = new connError.constructor(connError.message);
- }
- if (respBody !== null && respBody instanceof ReadableStream) {
- await respBody.cancel(error);
- }
- throw error;
+ ],
+ (respBody instanceof Uint8Array || typeof respBody === "string")
+ ? respBody
+ : null,
+ );
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
}
+ if (respBody !== null && respBody instanceof ReadableStream) {
+ await respBody.cancel(error);
+ }
+ throw error;
+ }
- if (isStreamingResponseBody) {
+ // If `respond` returns a responseBodyRid, we should stream the body
+ // to that resource.
+ if (responseBodyRid !== null) {
+ SetPrototypeAdd(httpConn.managedResources, responseBodyRid);
+ try {
if (respBody === null || !(respBody instanceof ReadableStream)) {
throw new TypeError("Unreachable");
}
@@ -231,7 +239,11 @@
break;
}
try {
- await core.opAsync("op_http_write", streamRid, value);
+ await core.opAsync(
+ "op_http_response_write",
+ responseBodyRid,
+ value,
+ );
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (error instanceof BadResource && connError != null) {
@@ -242,55 +254,61 @@
throw error;
}
}
+ } finally {
+ // Once all chunks are sent, and the request body is closed, we can
+ // close the response body.
+ SetPrototypeDelete(httpConn.managedResources, responseBodyRid);
try {
- await core.opAsync("op_http_shutdown", streamRid);
- } catch (error) {
- await reader.cancel(error);
- throw error;
- }
+ await core.opAsync("op_http_response_close", responseBodyRid);
+ } catch { /* pass */ }
}
+ }
- const ws = resp[_ws];
- if (ws) {
- const wsRid = await core.opAsync(
- "op_http_upgrade_websocket",
- streamRid,
+ const ws = resp[_ws];
+ if (ws) {
+ if (typeof requestRid !== "number") {
+ throw new TypeError(
+ "This request can not be upgraded to a websocket connection.",
);
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+ }
- httpConn.close();
+ const wsRid = await core.opAsync(
+ "op_http_upgrade_websocket",
+ requestRid,
+ );
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
- if (ws[_readyState] === WebSocket.CLOSING) {
- await core.opAsync("op_ws_close", { rid: wsRid });
+ if (ws[_readyState] === WebSocket.CLOSING) {
+ await core.opAsync("op_ws_close", { rid: wsRid });
- ws[_readyState] = WebSocket.CLOSED;
+ ws[_readyState] = WebSocket.CLOSED;
- const errEvent = new ErrorEvent("error");
- ws.dispatchEvent(errEvent);
+ const errEvent = new ErrorEvent("error");
+ ws.dispatchEvent(errEvent);
- const event = new CloseEvent("close");
- ws.dispatchEvent(event);
+ const event = new CloseEvent("close");
+ ws.dispatchEvent(event);
- core.tryClose(wsRid);
- } else {
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
+ core.tryClose(wsRid);
+ } else {
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
- ws[_eventLoop]();
- }
- }
- } finally {
- if (SetPrototypeHas(httpConn.managedResources, streamRid)) {
- SetPrototypeDelete(httpConn.managedResources, streamRid);
- core.close(streamRid);
+ ws[_eventLoop]();
}
+ } else if (typeof requestRid === "number") {
+ // Try to close "request" resource. It might have been already consumed,
+ // but if it hasn't been we need to close it here to avoid resource
+ // leak.
+ SetPrototypeDelete(httpConn.managedResources, requestRid);
+ core.tryClose(requestRid);
}
};
}
- function createRequestBodyStream(streamRid) {
+ function createRequestBodyStream(httpConn, requestRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
@@ -298,21 +316,32 @@
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await readRequest(streamRid, chunk);
+ const read = await readRequest(
+ requestRid,
+ chunk,
+ );
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
+ SetPrototypeDelete(httpConn.managedResources, requestRid);
+ core.close(requestRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
+ SetPrototypeDelete(httpConn.managedResources, requestRid);
+ core.close(requestRid);
}
},
+ cancel() {
+ SetPrototypeDelete(httpConn.managedResources, requestRid);
+ core.close(requestRid);
+ },
});
}