summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/serve_test.ts148
-rw-r--r--ext/http/00_serve.js88
2 files changed, 141 insertions, 95 deletions
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index 2bdfbfe3c..6158f587e 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -532,21 +532,43 @@ Deno.test(
},
);
-Deno.test(
- { permissions: { net: true } },
- async function httpServerStreamResponse() {
- const stream = new TransformStream();
- const writer = stream.writable.getWriter();
- writer.write(new TextEncoder().encode("hello "));
- writer.write(new TextEncoder().encode("world"));
- writer.close();
+function createStreamTest(count: number, delay: number, action: string) {
+ function doAction(controller: ReadableStreamDefaultController, i: number) {
+ if (i == count) {
+ if (action == "Throw") {
+ controller.error(new Error("Expected error!"));
+ } else {
+ controller.close();
+ }
+ } else {
+ controller.enqueue(`a${i}`);
- const listeningPromise = deferred();
+ if (delay == 0) {
+ doAction(controller, i + 1);
+ } else {
+ setTimeout(() => doAction(controller, i + 1), delay);
+ }
+ }
+ }
+
+ function makeStream(count: number, delay: number): ReadableStream {
+ return new ReadableStream({
+ start(controller) {
+ if (delay == 0) {
+ doAction(controller, 0);
+ } else {
+ setTimeout(() => doAction(controller, 0), delay);
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+ }
+
+ Deno.test(`httpServerStreamCount${count}Delay${delay}${action}`, async () => {
const ac = new AbortController();
+ const listeningPromise = deferred();
const server = Deno.serve({
- handler: (request) => {
- assert(!request.body);
- return new Response(stream.readable);
+ handler: async (request) => {
+ return new Response(makeStream(count, delay));
},
port: 4501,
signal: ac.signal,
@@ -556,12 +578,34 @@ Deno.test(
await listeningPromise;
const resp = await fetch("http://127.0.0.1:4501/");
- const respBody = await resp.text();
- assertEquals("hello world", respBody);
+ const text = await resp.text();
+
ac.abort();
await server;
- },
-);
+ let expected = "";
+ if (action == "Throw" && count < 2 && delay < 1000) {
+ // NOTE: This is specific to the current implementation. In some cases where a stream errors, we
+ // don't send the first packet.
+ expected = "";
+ } else {
+ for (let i = 0; i < count; i++) {
+ expected += `a${i}`;
+ }
+ }
+
+ assertEquals(text, expected);
+ });
+}
+
+for (let count of [0, 1, 2, 3]) {
+ for (let delay of [0, 1, 1000]) {
+ // Creating a stream that errors in start will throw
+ if (delay > 0) {
+ createStreamTest(count, delay, "Throw");
+ }
+ createStreamTest(count, delay, "Close");
+ }
+}
Deno.test(
{ permissions: { net: true } },
@@ -1692,78 +1736,6 @@ createServerLengthTest("autoResponseWithUnknownLengthEmpty", {
Deno.test(
{ permissions: { net: true } },
- async function httpServerGetChunkedResponseWithKa() {
- const promises = [deferred(), deferred()];
- let reqCount = 0;
- const listeningPromise = deferred();
- const ac = new AbortController();
-
- const server = Deno.serve({
- handler: async (request) => {
- assertEquals(request.method, "GET");
- promises[reqCount].resolve();
- reqCount++;
- return new Response(reqCount <= 1 ? stream("foo bar baz") : "zar quux");
- },
- port: 4503,
- signal: ac.signal,
- onListen: onListen(listeningPromise),
- onError: createOnErrorCb(ac),
- });
-
- await listeningPromise;
- const conn = await Deno.connect({ port: 4503 });
- const encoder = new TextEncoder();
- {
- const body =
- `GET / HTTP/1.1\r\nHost: example.domain\r\nConnection: keep-alive\r\n\r\n`;
- const writeResult = await conn.write(encoder.encode(body));
- assertEquals(body.length, writeResult);
- await promises[0];
- }
-
- const decoder = new TextDecoder();
- {
- let msg = "";
- while (true) {
- try {
- const buf = new Uint8Array(1024);
- const readResult = await conn.read(buf);
- assert(readResult);
- msg += decoder.decode(buf.subarray(0, readResult));
- assert(msg.endsWith("\r\nfoo bar baz\r\n0\r\n\r\n"));
- break;
- } catch {
- continue;
- }
- }
- }
-
- // once more!
- {
- const body =
- `GET /quux HTTP/1.1\r\nHost: example.domain\r\nConnection: close\r\n\r\n`;
- const writeResult = await conn.write(encoder.encode(body));
- assertEquals(body.length, writeResult);
- await promises[1];
- }
- {
- const buf = new Uint8Array(1024);
- const readResult = await conn.read(buf);
- assert(readResult);
- const msg = decoder.decode(buf.subarray(0, readResult));
- assert(msg.endsWith("zar quux"));
- }
-
- conn.close();
-
- ac.abort();
- await server;
- },
-);
-
-Deno.test(
- { permissions: { net: true } },
async function httpServerPostWithContentLengthBody() {
const promise = deferred();
const listeningPromise = deferred();
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 1efa4cddb..56f250d1d 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -28,6 +28,7 @@ import {
import {
Deferred,
getReadableStreamResourceBacking,
+ readableStreamClose,
readableStreamForRid,
ReadableStreamPrototype,
} from "ext:deno_web/06_streams.js";
@@ -331,24 +332,97 @@ function fastSyncResponseOrStream(req, respBody) {
}
async function asyncResponse(responseBodies, req, status, stream) {
- const responseRid = core.ops.op_set_response_body_stream(req);
- SetPrototypeAdd(responseBodies, responseRid);
const reader = stream.getReader();
- core.ops.op_set_promise_complete(req, status);
+ let responseRid;
+ let closed = false;
+ let timeout;
+
try {
+ // IMPORTANT: We get a performance boost from this optimization, but V8 is very
+ // sensitive to the order and structure. Benchmark any changes to this code.
+
+ // Optimize for streams that are done in zero or one packets. We will not
+ // have to allocate a resource in this case.
+ const { value: value1, done: done1 } = await reader.read();
+ if (done1) {
+ closed = true;
+ // Exit 1: no response body at all, extreme fast path
+ // Reader will be closed by finally block
+ return;
+ }
+
+ // The second value cannot block indefinitely, as someone may be waiting on a response
+ // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms
+ // and we race it.
+ let timeoutPromise;
+ timeout = setTimeout(() => {
+ responseRid = core.ops.op_set_response_body_stream(req);
+ SetPrototypeAdd(responseBodies, responseRid);
+ core.ops.op_set_promise_complete(req, status);
+ timeoutPromise = core.writeAll(responseRid, value1);
+ }, 250);
+ const { value: value2, done: done2 } = await reader.read();
+
+ if (timeoutPromise) {
+ await timeoutPromise;
+ if (done2) {
+ closed = true;
+ // Exit 2(a): read 2 is EOS, and timeout resolved.
+ // Reader will be closed by finally block
+ // Response stream will be closed by finally block.
+ return;
+ }
+
+ // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward.
+ } else {
+ clearTimeout(timeout);
+ timeout = undefined;
+
+ if (done2) {
+ // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough.
+ // Reader will be closed by finally block
+ // No response stream
+ closed = true;
+ core.ops.op_set_response_body_bytes(req, value1);
+ return;
+ }
+
+ responseRid = core.ops.op_set_response_body_stream(req);
+ SetPrototypeAdd(responseBodies, responseRid);
+ core.ops.op_set_promise_complete(req, status);
+ // Write our first packet
+ await core.writeAll(responseRid, value1);
+ }
+
+ await core.writeAll(responseRid, value2);
while (true) {
const { value, done } = await reader.read();
if (done) {
+ closed = true;
break;
}
await core.writeAll(responseRid, value);
}
} catch (error) {
- await reader.cancel(error);
+ closed = true;
+ try {
+ await reader.cancel(error);
+ } catch {
+ // Pass
+ }
} finally {
- core.tryClose(responseRid);
- SetPrototypeDelete(responseBodies, responseRid);
- reader.releaseLock();
+ if (!closed) {
+ readableStreamClose(reader);
+ }
+ if (timeout !== undefined) {
+ clearTimeout(timeout);
+ }
+ if (responseRid) {
+ core.tryClose(responseRid);
+ SetPrototypeDelete(responseBodies, responseRid);
+ } else {
+ core.ops.op_set_promise_complete(req, status);
+ }
}
}