summaryrefslogtreecommitdiff
path: root/ext/http/00_serve.js
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-17 07:52:37 -0600
committerGitHub <noreply@github.com>2023-08-17 07:52:37 -0600
commit23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch)
tree1521ffd2ac5e803224546cb349b3905925b9b5ff /ext/http/00_serve.js
parent0960e895da1275792c1f38999f6a185c864edb3f (diff)
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work. This is a resource wrapper for `ReadableStream`, allowing us to treat all `ReadableStream` instances as resources, and remove special paths in both `fetch` and `serve`. Performance with a ReadableStream response yields ~18% improvement: ``` return new Response(new ReadableStream({ start(controller) { controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100])); controller.close(); } }) ``` This patch: ``` 12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080 Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 99.96us 100.03us 6.65ms 98.84% Req/Sec 47.73k 2.43k 51.02k 89.11% 959308 requests in 10.10s, 117.10MB read Requests/sec: 94978.71 Transfer/sec: 11.59MB ``` main: ``` Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 163.03us 685.51us 19.73ms 99.27% Req/Sec 39.50k 3.98k 66.11k 95.52% 789582 requests in 10.10s, 82.83MB read Requests/sec: 78182.65 Transfer/sec: 8.20MB ```
Diffstat (limited to 'ext/http/00_serve.js')
-rw-r--r--ext/http/00_serve.js154
1 files changed, 18 insertions, 136 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 3447f48e2..265b79706 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -30,9 +30,9 @@ import {
import {
Deferred,
getReadableStreamResourceBacking,
- readableStreamClose,
readableStreamForRid,
ReadableStreamPrototype,
+ resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
import { listen, TcpConn } from "ext:deno_net/01_net.js";
import { listenTls } from "ext:deno_net/02_tls.js";
@@ -41,10 +41,6 @@ const {
Error,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
- SafeSet,
- SafeSetIterator,
- SetPrototypeAdd,
- SetPrototypeDelete,
Symbol,
SymbolFor,
TypeError,
@@ -61,7 +57,6 @@ const {
op_http_set_promise_complete,
op_http_set_response_body_bytes,
op_http_set_response_body_resource,
- op_http_set_response_body_stream,
op_http_set_response_body_text,
op_http_set_response_header,
op_http_set_response_headers,
@@ -339,7 +334,6 @@ class InnerRequest {
class CallbackContext {
abortController;
- responseBodies;
scheme;
fallbackHost;
serverRid;
@@ -352,7 +346,6 @@ class CallbackContext {
{ once: true },
);
this.abortController = new AbortController();
- this.responseBodies = new SafeSet();
this.serverRid = args[0];
this.scheme = args[1];
this.fallbackHost = args[2];
@@ -379,23 +372,24 @@ class ServeHandlerInfo {
}
}
-function fastSyncResponseOrStream(req, respBody) {
+function fastSyncResponseOrStream(req, respBody, status) {
if (respBody === null || respBody === undefined) {
// Don't set the body
- return null;
+ op_http_set_promise_complete(req, status);
+ return;
}
const stream = respBody.streamOrStatic;
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
- op_http_set_response_body_bytes(req, body);
- return null;
+ op_http_set_response_body_bytes(req, body, status);
+ return;
}
if (typeof body === "string") {
- op_http_set_response_body_text(req, body);
- return null;
+ op_http_set_response_body_text(req, body, status);
+ return;
}
// At this point in the response it needs to be a stream
@@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) {
req,
resourceBacking.rid,
resourceBacking.autoClose,
+ status,
+ );
+ } else {
+ const rid = resourceForReadableStream(stream);
+ op_http_set_response_body_resource(
+ req,
+ rid,
+ true,
+ status,
);
- return null;
- }
-
- return stream;
-}
-
-async function asyncResponse(responseBodies, req, status, stream) {
- const reader = stream.getReader();
- let responseRid;
- let closed = false;
- let timeout;
-
- try {
- // IMPORTANT: We get a performance boost from this optimization, but V8 is very
- // sensitive to the order and structure. Benchmark any changes to this code.
-
- // Optimize for streams that are done in zero or one packets. We will not
- // have to allocate a resource in this case.
- const { value: value1, done: done1 } = await reader.read();
- if (done1) {
- closed = true;
- // Exit 1: no response body at all, extreme fast path
- // Reader will be closed by finally block
- return;
- }
-
- // The second value cannot block indefinitely, as someone may be waiting on a response
- // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms
- // and we race it.
- let timeoutPromise;
- timeout = setTimeout(() => {
- responseRid = op_http_set_response_body_stream(req);
- SetPrototypeAdd(responseBodies, responseRid);
- op_http_set_promise_complete(req, status);
- // TODO(mmastrac): if this promise fails before we get to the await below, it crashes
- // the process with an error:
- //
- // 'Uncaught (in promise) BadResource: failed to write'.
- //
- // To avoid this, we're going to swallow errors here and allow the code later in the
- // file to re-throw them in a way that doesn't appear to be an uncaught promise rejection.
- timeoutPromise = PromisePrototypeCatch(
- core.writeAll(responseRid, value1),
- () => null,
- );
- }, 250);
- const { value: value2, done: done2 } = await reader.read();
-
- if (timeoutPromise) {
- await timeoutPromise;
- if (done2) {
- closed = true;
- // Exit 2(a): read 2 is EOS, and timeout resolved.
- // Reader will be closed by finally block
- // Response stream will be closed by finally block.
- return;
- }
-
- // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward.
- } else {
- clearTimeout(timeout);
- timeout = undefined;
-
- if (done2) {
- // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough.
- // Reader will be closed by finally block
- // No response stream
- closed = true;
- op_http_set_response_body_bytes(req, value1);
- return;
- }
-
- responseRid = op_http_set_response_body_stream(req);
- SetPrototypeAdd(responseBodies, responseRid);
- op_http_set_promise_complete(req, status);
- // Write our first packet
- await core.writeAll(responseRid, value1);
- }
-
- await core.writeAll(responseRid, value2);
- while (true) {
- const { value, done } = await reader.read();
- if (done) {
- closed = true;
- break;
- }
- await core.writeAll(responseRid, value);
- }
- } catch (error) {
- closed = true;
- try {
- await reader.cancel(error);
- } catch {
- // Pass
- }
- } finally {
- if (!closed) {
- readableStreamClose(reader);
- }
- if (timeout !== undefined) {
- clearTimeout(timeout);
- }
- if (responseRid) {
- core.tryClose(responseRid);
- SetPrototypeDelete(responseBodies, responseRid);
- } else {
- op_http_set_promise_complete(req, status);
- }
}
}
@@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) {
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(context, callback, onError) {
- const responseBodies = context.responseBodies;
const signal = context.abortController.signal;
const hasCallback = callback.length > 0;
const hasOneCallback = callback.length === 1;
@@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) {
}
}
- // Attempt to respond quickly to this request, otherwise extract the stream
- const stream = fastSyncResponseOrStream(req, inner.body);
- if (stream !== null) {
- // Handle the stream asynchronously
- await asyncResponse(responseBodies, req, status, stream);
- } else {
- op_http_set_promise_complete(req, status);
- }
-
+ fastSyncResponseOrStream(req, inner.body, status);
innerRequest?.close();
};
}
@@ -755,10 +641,6 @@ function serveHttpOn(context, callback) {
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
-
- for (const streamRid of new SafeSetIterator(context.responseBodies)) {
- core.tryClose(streamRid);
- }
})();
return {