summaryrefslogtreecommitdiff
path: root/ext/flash/01_http.js
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-08-19 10:14:56 +0530
committerGitHub <noreply@github.com>2022-08-19 10:14:56 +0530
commit8bdcec1c84636aa00bf7444539e68b49d79b1fbf (patch)
tree77fb6ea2aadda9b50a839ac858f0a11073c06521 /ext/flash/01_http.js
parentcd21cff29942f24ba7d38287186cce64d0e84e56 (diff)
fix(ext/flash): concurrent response streams (#15493)
Diffstat (limited to 'ext/flash/01_http.js')
-rw-r--r--ext/flash/01_http.js212
1 files changed, 109 insertions, 103 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index fd817219e..19920da58 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -237,20 +237,21 @@
await server.finished;
},
async serve() {
+ let offset = 0;
while (true) {
if (server.closed) {
break;
}
- let token = nextRequestSync();
- if (token === 0) {
- token = await core.opAsync("op_flash_next_async", serverId);
+ let tokens = nextRequestSync();
+ if (tokens === 0) {
+ tokens = await core.opAsync("op_flash_next_async", serverId);
if (server.closed) {
break;
}
}
- for (let i = 0; i < token; i++) {
+ for (let i = offset; i < offset + tokens; i++) {
let body = null;
// There might be a body, but we don't expose it for GET/HEAD requests.
// It will be closed automatically once the request has been handled and
@@ -290,17 +291,6 @@
if (resp === undefined) {
continue;
}
-
- const ws = resp[_ws];
- if (!ws) {
- if (hasBody && body[_state] !== "closed") {
- // TODO(@littledivy): Optimize by draining in a single op.
- try {
- await req.arrayBuffer();
- } catch { /* pass */ }
- }
- }
-
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a
@@ -360,74 +350,8 @@
respBody = new Uint8Array(0);
}
- if (isStreamingResponseBody === true) {
- const resourceRid = getReadableStreamRid(respBody);
- if (resourceRid) {
- if (respBody.locked) {
- throw new TypeError("ReadableStream is locked.");
- }
- const reader = respBody.getReader(); // Aquire JS lock.
- try {
- core.opAsync(
- "op_flash_write_resource",
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- true,
- ),
- serverId,
- i,
- resourceRid,
- ).then(() => {
- // Release JS lock.
- readableStreamClose(respBody);
- });
- } catch (error) {
- await reader.cancel(error);
- throw error;
- }
- } else {
- const reader = respBody.getReader();
- let first = true;
- a:
- while (true) {
- const { value, done } = await reader.read();
- if (first) {
- first = false;
- core.ops.op_flash_respond(
- serverId,
- i,
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- ),
- value ?? new Uint8Array(),
- false,
- );
- } else {
- if (value === undefined) {
- core.ops.op_flash_respond_chuncked(
- serverId,
- i,
- undefined,
- done,
- );
- } else {
- respondChunked(
- i,
- value,
- done,
- );
- }
- }
- if (done) break a;
- }
- }
- } else {
+ const ws = resp[_ws];
+ if (isStreamingResponseBody === false) {
const responseStr = http1Response(
method,
innerResp.status ?? 200,
@@ -456,29 +380,111 @@
}
}
- if (ws) {
- const wsRid = await core.opAsync(
- "op_flash_upgrade_websocket",
- serverId,
- i,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
-
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
-
- ws[_eventLoop]();
- if (ws[_idleTimeoutDuration]) {
- ws.addEventListener(
- "close",
- () => clearTimeout(ws[_idleTimeoutTimeout]),
+ (async () => {
+ if (!ws) {
+ if (hasBody && body[_state] !== "closed") {
+ // TODO(@littledivy): Optimize by draining in a single op.
+ try {
+ await req.arrayBuffer();
+ } catch { /* pass */ }
+ }
+ }
+
+ if (isStreamingResponseBody === true) {
+ const resourceRid = getReadableStreamRid(respBody);
+ if (resourceRid) {
+ if (respBody.locked) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ const reader = respBody.getReader(); // Aquire JS lock.
+ try {
+ core.opAsync(
+ "op_flash_write_resource",
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ true,
+ ),
+ serverId,
+ i,
+ resourceRid,
+ ).then(() => {
+ // Release JS lock.
+ readableStreamClose(respBody);
+ });
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
+ } else {
+ const reader = respBody.getReader();
+ let first = true;
+ a:
+ while (true) {
+ const { value, done } = await reader.read();
+ if (first) {
+ first = false;
+ core.ops.op_flash_respond(
+ serverId,
+ i,
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ ),
+ value ?? new Uint8Array(),
+ false,
+ );
+ } else {
+ if (value === undefined) {
+ core.ops.op_flash_respond_chuncked(
+ serverId,
+ i,
+ undefined,
+ done,
+ );
+ } else {
+ respondChunked(
+ i,
+ value,
+ done,
+ );
+ }
+ }
+ if (done) break a;
+ }
+ }
+ }
+
+ if (ws) {
+ const wsRid = await core.opAsync(
+ "op_flash_upgrade_websocket",
+ serverId,
+ i,
);
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ if (ws[_idleTimeoutDuration]) {
+ ws.addEventListener(
+ "close",
+ () => clearTimeout(ws[_idleTimeoutTimeout]),
+ );
+ }
+ ws[_serverHandleIdleTimeout]();
}
- ws[_serverHandleIdleTimeout]();
- }
+ })().catch(onError);
}
+
+ offset += tokens;
}
await server.finished;
},