diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2022-08-19 10:14:56 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-19 10:14:56 +0530 |
commit | 8bdcec1c84636aa00bf7444539e68b49d79b1fbf (patch) | |
tree | 77fb6ea2aadda9b50a839ac858f0a11073c06521 /ext/flash/01_http.js | |
parent | cd21cff29942f24ba7d38287186cce64d0e84e56 (diff) |
fix(ext/flash): concurrent response streams (#15493)
Diffstat (limited to 'ext/flash/01_http.js')
-rw-r--r-- | ext/flash/01_http.js | 212 |
1 files changed, 109 insertions, 103 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index fd817219e..19920da58 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -237,20 +237,21 @@ await server.finished; }, async serve() { + let offset = 0; while (true) { if (server.closed) { break; } - let token = nextRequestSync(); - if (token === 0) { - token = await core.opAsync("op_flash_next_async", serverId); + let tokens = nextRequestSync(); + if (tokens === 0) { + tokens = await core.opAsync("op_flash_next_async", serverId); if (server.closed) { break; } } - for (let i = 0; i < token; i++) { + for (let i = offset; i < offset + tokens; i++) { let body = null; // There might be a body, but we don't expose it for GET/HEAD requests. // It will be closed automatically once the request has been handled and @@ -290,17 +291,6 @@ if (resp === undefined) { continue; } - - const ws = resp[_ws]; - if (!ws) { - if (hasBody && body[_state] !== "closed") { - // TODO(@littledivy): Optimize by draining in a single op. - try { - await req.arrayBuffer(); - } catch { /* pass */ } - } - } - const innerResp = toInnerResponse(resp); // If response body length is known, it will be sent synchronously in a @@ -360,74 +350,8 @@ respBody = new Uint8Array(0); } - if (isStreamingResponseBody === true) { - const resourceRid = getReadableStreamRid(respBody); - if (resourceRid) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); - } - const reader = respBody.getReader(); // Aquire JS lock. - try { - core.opAsync( - "op_flash_write_resource", - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - true, - ), - serverId, - i, - resourceRid, - ).then(() => { - // Release JS lock. - readableStreamClose(respBody); - }); - } catch (error) { - await reader.cancel(error); - throw error; - } - } else { - const reader = respBody.getReader(); - let first = true; - a: - while (true) { - const { value, done } = await reader.read(); - if (first) { - first = false; - core.ops.op_flash_respond( - serverId, - i, - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - ), - value ?? new Uint8Array(), - false, - ); - } else { - if (value === undefined) { - core.ops.op_flash_respond_chuncked( - serverId, - i, - undefined, - done, - ); - } else { - respondChunked( - i, - value, - done, - ); - } - } - if (done) break a; - } - } - } else { + const ws = resp[_ws]; + if (isStreamingResponseBody === false) { const responseStr = http1Response( method, innerResp.status ?? 200, @@ -456,29 +380,111 @@ } } - if (ws) { - const wsRid = await core.opAsync( - "op_flash_upgrade_websocket", - serverId, - i, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - if (ws[_idleTimeoutDuration]) { - ws.addEventListener( - "close", - () => clearTimeout(ws[_idleTimeoutTimeout]), + (async () => { + if (!ws) { + if (hasBody && body[_state] !== "closed") { + // TODO(@littledivy): Optimize by draining in a single op. + try { + await req.arrayBuffer(); + } catch { /* pass */ } + } + } + + if (isStreamingResponseBody === true) { + const resourceRid = getReadableStreamRid(respBody); + if (resourceRid) { + if (respBody.locked) { + throw new TypeError("ReadableStream is locked."); + } + const reader = respBody.getReader(); // Aquire JS lock. + try { + core.opAsync( + "op_flash_write_resource", + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + null, + true, + ), + serverId, + i, + resourceRid, + ).then(() => { + // Release JS lock. + readableStreamClose(respBody); + }); + } catch (error) { + await reader.cancel(error); + throw error; + } + } else { + const reader = respBody.getReader(); + let first = true; + a: + while (true) { + const { value, done } = await reader.read(); + if (first) { + first = false; + core.ops.op_flash_respond( + serverId, + i, + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + null, + ), + value ?? new Uint8Array(), + false, + ); + } else { + if (value === undefined) { + core.ops.op_flash_respond_chuncked( + serverId, + i, + undefined, + done, + ); + } else { + respondChunked( + i, + value, + done, + ); + } + } + if (done) break a; + } + } + } + + if (ws) { + const wsRid = await core.opAsync( + "op_flash_upgrade_websocket", + serverId, + i, ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); } - ws[_serverHandleIdleTimeout](); - } + })().catch(onError); } + + offset += tokens; } await server.finished; }, |