diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-08-17 07:52:37 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-17 07:52:37 -0600 |
commit | 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch) | |
tree | 1521ffd2ac5e803224546cb349b3905925b9b5ff /ext/http/00_serve.js | |
parent | 0960e895da1275792c1f38999f6a185c864edb3f (diff) |
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work.
This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.
Performance with a ReadableStream response yields ~18% improvement:
```
return new Response(new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
controller.close();
}
})
```
This patch:
```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 99.96us 100.03us 6.65ms 98.84%
Req/Sec 47.73k 2.43k 51.02k 89.11%
959308 requests in 10.10s, 117.10MB read
Requests/sec: 94978.71
Transfer/sec: 11.59MB
```
main:
```
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 163.03us 685.51us 19.73ms 99.27%
Req/Sec 39.50k 3.98k 66.11k 95.52%
789582 requests in 10.10s, 82.83MB read
Requests/sec: 78182.65
Transfer/sec: 8.20MB
```
Diffstat (limited to 'ext/http/00_serve.js')
-rw-r--r-- | ext/http/00_serve.js | 154 |
1 files changed, 18 insertions, 136 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 3447f48e2..265b79706 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -30,9 +30,9 @@ import { import { Deferred, getReadableStreamResourceBacking, - readableStreamClose, readableStreamForRid, ReadableStreamPrototype, + resourceForReadableStream, } from "ext:deno_web/06_streams.js"; import { listen, TcpConn } from "ext:deno_net/01_net.js"; import { listenTls } from "ext:deno_net/02_tls.js"; @@ -41,10 +41,6 @@ const { Error, ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, - SafeSet, - SafeSetIterator, - SetPrototypeAdd, - SetPrototypeDelete, Symbol, SymbolFor, TypeError, @@ -61,7 +57,6 @@ const { op_http_set_promise_complete, op_http_set_response_body_bytes, op_http_set_response_body_resource, - op_http_set_response_body_stream, op_http_set_response_body_text, op_http_set_response_header, op_http_set_response_headers, @@ -339,7 +334,6 @@ class InnerRequest { class CallbackContext { abortController; - responseBodies; scheme; fallbackHost; serverRid; @@ -352,7 +346,6 @@ class CallbackContext { { once: true }, ); this.abortController = new AbortController(); - this.responseBodies = new SafeSet(); this.serverRid = args[0]; this.scheme = args[1]; this.fallbackHost = args[2]; @@ -379,23 +372,24 @@ class ServeHandlerInfo { } } -function fastSyncResponseOrStream(req, respBody) { +function fastSyncResponseOrStream(req, respBody, status) { if (respBody === null || respBody === undefined) { // Don't set the body - return null; + op_http_set_promise_complete(req, status); + return; } const stream = respBody.streamOrStatic; const body = stream.body; if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { - op_http_set_response_body_bytes(req, body); - return null; + op_http_set_response_body_bytes(req, body, status); + return; } if (typeof body === "string") { - op_http_set_response_body_text(req, body); - return null; + op_http_set_response_body_text(req, body, status); + return; } // At this point in the response it needs to be a stream @@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) { req, resourceBacking.rid, resourceBacking.autoClose, + status, + ); + } else { + const rid = resourceForReadableStream(stream); + op_http_set_response_body_resource( + req, + rid, + true, + status, ); - return null; - } - - return stream; -} - -async function asyncResponse(responseBodies, req, status, stream) { - const reader = stream.getReader(); - let responseRid; - let closed = false; - let timeout; - - try { - // IMPORTANT: We get a performance boost from this optimization, but V8 is very - // sensitive to the order and structure. Benchmark any changes to this code. - - // Optimize for streams that are done in zero or one packets. We will not - // have to allocate a resource in this case. - const { value: value1, done: done1 } = await reader.read(); - if (done1) { - closed = true; - // Exit 1: no response body at all, extreme fast path - // Reader will be closed by finally block - return; - } - - // The second value cannot block indefinitely, as someone may be waiting on a response - // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms - // and we race it. - let timeoutPromise; - timeout = setTimeout(() => { - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // TODO(mmastrac): if this promise fails before we get to the await below, it crashes - // the process with an error: - // - // 'Uncaught (in promise) BadResource: failed to write'. - // - // To avoid this, we're going to swallow errors here and allow the code later in the - // file to re-throw them in a way that doesn't appear to be an uncaught promise rejection. - timeoutPromise = PromisePrototypeCatch( - core.writeAll(responseRid, value1), - () => null, - ); - }, 250); - const { value: value2, done: done2 } = await reader.read(); - - if (timeoutPromise) { - await timeoutPromise; - if (done2) { - closed = true; - // Exit 2(a): read 2 is EOS, and timeout resolved. - // Reader will be closed by finally block - // Response stream will be closed by finally block. - return; - } - - // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward. - } else { - clearTimeout(timeout); - timeout = undefined; - - if (done2) { - // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough. - // Reader will be closed by finally block - // No response stream - closed = true; - op_http_set_response_body_bytes(req, value1); - return; - } - - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // Write our first packet - await core.writeAll(responseRid, value1); - } - - await core.writeAll(responseRid, value2); - while (true) { - const { value, done } = await reader.read(); - if (done) { - closed = true; - break; - } - await core.writeAll(responseRid, value); - } - } catch (error) { - closed = true; - try { - await reader.cancel(error); - } catch { - // Pass - } - } finally { - if (!closed) { - readableStreamClose(reader); - } - if (timeout !== undefined) { - clearTimeout(timeout); - } - if (responseRid) { - core.tryClose(responseRid); - SetPrototypeDelete(responseBodies, responseRid); - } else { - op_http_set_promise_complete(req, status); - } } } @@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) { * This function returns a promise that will only reject in the case of abnormal exit. */ function mapToCallback(context, callback, onError) { - const responseBodies = context.responseBodies; const signal = context.abortController.signal; const hasCallback = callback.length > 0; const hasOneCallback = callback.length === 1; @@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) { } } - // Attempt to respond quickly to this request, otherwise extract the stream - const stream = fastSyncResponseOrStream(req, inner.body); - if (stream !== null) { - // Handle the stream asynchronously - await asyncResponse(responseBodies, req, status, stream); - } else { - op_http_set_promise_complete(req, status); - } - + fastSyncResponseOrStream(req, inner.body, status); innerRequest?.close(); }; } @@ -755,10 +641,6 @@ function serveHttpOn(context, callback) { } PromisePrototypeCatch(callback(req), promiseErrorHandler); } - - for (const streamRid of new SafeSetIterator(context.responseBodies)) { - core.tryClose(streamRid); - } })(); return { |