summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/flash/01_http.js386
1 files changed, 214 insertions, 172 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index b28989ccb..72c0cb125 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -32,6 +32,7 @@
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
+ Promise,
Uint8ArrayPrototype,
} = window.__bootstrap.primordials;
@@ -227,6 +228,192 @@
}
}
+ // TODO(@littledivy): Woah woah, cut down the number of arguments.
+ async function handleResponse(
+ req,
+ resp,
+ body,
+ hasBody,
+ method,
+ serverId,
+ i,
+ respondFast,
+ respondChunked,
+ ) {
+ // there might've been an HTTP upgrade.
+ if (resp === undefined) {
+ return;
+ }
+ const innerResp = toInnerResponse(resp);
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ let isStreamingResponseBody = false;
+ if (innerResp.body !== null) {
+ if (typeof innerResp.body.streamOrStatic?.body === "string") {
+ if (innerResp.body.streamOrStatic.consumed === true) {
+ throw new TypeError("Body is unusable.");
+ }
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ isStreamingResponseBody = false;
+ } else if (
+ ObjectPrototypeIsPrototypeOf(
+ ReadableStreamPrototype,
+ innerResp.body.streamOrStatic,
+ )
+ ) {
+ if (innerResp.body.unusable()) {
+ throw new TypeError("Body is unusable.");
+ }
+ if (
+ innerResp.body.length === null ||
+ ObjectPrototypeIsPrototypeOf(
+ BlobPrototype,
+ innerResp.body.source,
+ )
+ ) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
+ }
+ isStreamingResponseBody = !(
+ typeof respBody === "string" ||
+ ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
+ );
+ } else {
+ if (innerResp.body.streamOrStatic.consumed === true) {
+ throw new TypeError("Body is unusable.");
+ }
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ }
+ } else {
+ respBody = new Uint8Array(0);
+ }
+
+ const ws = resp[_ws];
+ if (isStreamingResponseBody === false) {
+ const length = respBody.byteLength || core.byteLength(respBody);
+ const responseStr = http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ respBody,
+ length,
+ );
+ writeFixedResponse(
+ serverId,
+ i,
+ responseStr,
+ length,
+ !ws, // Don't close socket if there is a deferred websocket upgrade.
+ respondFast,
+ );
+ }
+
+ (async () => {
+ if (!ws) {
+ if (hasBody && body[_state] !== "closed") {
+ // TODO(@littledivy): Optimize by draining in a single op.
+ try {
+ await req.arrayBuffer();
+ } catch { /* pass */ }
+ }
+ }
+
+ if (isStreamingResponseBody === true) {
+ const resourceRid = getReadableStreamRid(respBody);
+ if (resourceRid) {
+ if (respBody.locked) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ const reader = respBody.getReader(); // Aquire JS lock.
+ try {
+ core.opAsync(
+ "op_flash_write_resource",
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ 0, // Content-Length will be set by the op.
+ null,
+ true,
+ ),
+ serverId,
+ i,
+ resourceRid,
+ ).then(() => {
+ // Release JS lock.
+ readableStreamClose(respBody);
+ });
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
+ } else {
+ const reader = respBody.getReader();
+ writeFixedResponse(
+ serverId,
+ i,
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ respBody.byteLength,
+ null,
+ ),
+ respBody.byteLength,
+ false,
+ respondFast,
+ );
+ while (true) {
+ const { value, done } = await reader.read();
+ await respondChunked(
+ i,
+ value,
+ done,
+ );
+ if (done) break;
+ }
+ }
+ }
+
+ if (ws) {
+ const wsRid = await core.opAsync(
+ "op_flash_upgrade_websocket",
+ serverId,
+ i,
+ );
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ if (ws[_idleTimeoutDuration]) {
+ ws.addEventListener(
+ "close",
+ () => clearTimeout(ws[_idleTimeoutTimeout]),
+ );
+ }
+ ws[_serverHandleIdleTimeout]();
+ }
+ })();
+ }
+
async function serve(arg1, arg2) {
let options = undefined;
let handler = undefined;
@@ -353,183 +540,38 @@
let resp;
try {
- resp = await handler(req);
- } catch (e) {
- resp = await onError(e);
- }
- // there might've been an HTTP upgrade.
- if (resp === undefined) {
- return;
- }
- const innerResp = toInnerResponse(resp);
-
- // If response body length is known, it will be sent synchronously in a
- // single op, in other case a "response body" resource will be created and
- // we'll be streaming it.
- /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
- let respBody = null;
- let isStreamingResponseBody = false;
- if (innerResp.body !== null) {
- if (typeof innerResp.body.streamOrStatic?.body === "string") {
- if (innerResp.body.streamOrStatic.consumed === true) {
- throw new TypeError("Body is unusable.");
- }
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
- isStreamingResponseBody = false;
- } else if (
- ObjectPrototypeIsPrototypeOf(
- ReadableStreamPrototype,
- innerResp.body.streamOrStatic,
- )
- ) {
- if (innerResp.body.unusable()) {
- throw new TypeError("Body is unusable.");
- }
- if (
- innerResp.body.length === null ||
- ObjectPrototypeIsPrototypeOf(
- BlobPrototype,
- innerResp.body.source,
- )
- ) {
- respBody = innerResp.body.stream;
- } else {
- const reader = innerResp.body.stream.getReader();
- const r1 = await reader.read();
- if (r1.done) {
- respBody = new Uint8Array(0);
- } else {
- respBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
- }
- }
- isStreamingResponseBody = !(
- typeof respBody === "string" ||
- ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
- );
- } else {
- if (innerResp.body.streamOrStatic.consumed === true) {
- throw new TypeError("Body is unusable.");
- }
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
- }
- } else {
- respBody = new Uint8Array(0);
- }
-
- const ws = resp[_ws];
- if (isStreamingResponseBody === false) {
- const length = respBody.byteLength || core.byteLength(respBody);
- const responseStr = http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- respBody,
- length,
- );
- writeFixedResponse(
- serverId,
- i,
- responseStr,
- length,
- !ws, // Don't close socket if there is a deferred websocket upgrade.
- respondFast,
- );
- }
-
- (async () => {
- if (!ws) {
- if (hasBody && body[_state] !== "closed") {
- // TODO(@littledivy): Optimize by draining in a single op.
- try {
- await req.arrayBuffer();
- } catch { /* pass */ }
- }
- }
-
- if (isStreamingResponseBody === true) {
- const resourceRid = getReadableStreamRid(respBody);
- if (resourceRid) {
- if (respBody.locked) {
- throw new TypeError("ReadableStream is locked.");
- }
- const reader = respBody.getReader(); // Aquire JS lock.
- try {
- core.opAsync(
- "op_flash_write_resource",
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- 0, // Content-Length will be set by the op.
- null,
- true,
- ),
- serverId,
- i,
- resourceRid,
- ).then(() => {
- // Release JS lock.
- readableStreamClose(respBody);
- });
- } catch (error) {
- await reader.cancel(error);
- throw error;
- }
- } else {
- const reader = respBody.getReader();
- writeFixedResponse(
+ resp = handler(req);
+ if (resp instanceof Promise || typeof resp.then === "function") {
+ resp.then((resp) =>
+ handleResponse(
+ req,
+ resp,
+ body,
+ hasBody,
+ method,
serverId,
i,
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- respBody.byteLength,
- null,
- ),
- respBody.byteLength,
- false,
respondFast,
- );
- while (true) {
- const { value, done } = await reader.read();
- await respondChunked(
- i,
- value,
- done,
- );
- if (done) break;
- }
- }
+ respondChunked,
+ )
+ ).catch(onError);
+ continue;
}
+ } catch (e) {
+ resp = await onError(e);
+ }
- if (ws) {
- const wsRid = await core.opAsync(
- "op_flash_upgrade_websocket",
- serverId,
- i,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
-
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
-
- ws[_eventLoop]();
- if (ws[_idleTimeoutDuration]) {
- ws.addEventListener(
- "close",
- () => clearTimeout(ws[_idleTimeoutTimeout]),
- );
- }
- ws[_serverHandleIdleTimeout]();
- }
- })().catch(onError);
+ handleResponse(
+ req,
+ resp,
+ body,
+ hasBody,
+ method,
+ serverId,
+ i,
+ respondFast,
+ respondChunked,
+ );
}
offset += tokens;