diff options
-rw-r--r-- | cli/tests/unit/serve_test.ts | 148 | ||||
-rw-r--r-- | ext/http/00_serve.js | 88 |
2 files changed, 141 insertions, 95 deletions
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 2bdfbfe3c..6158f587e 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -532,21 +532,43 @@ Deno.test( }, ); -Deno.test( - { permissions: { net: true } }, - async function httpServerStreamResponse() { - const stream = new TransformStream(); - const writer = stream.writable.getWriter(); - writer.write(new TextEncoder().encode("hello ")); - writer.write(new TextEncoder().encode("world")); - writer.close(); +function createStreamTest(count: number, delay: number, action: string) { + function doAction(controller: ReadableStreamDefaultController, i: number) { + if (i == count) { + if (action == "Throw") { + controller.error(new Error("Expected error!")); + } else { + controller.close(); + } + } else { + controller.enqueue(`a${i}`); - const listeningPromise = deferred(); + if (delay == 0) { + doAction(controller, i + 1); + } else { + setTimeout(() => doAction(controller, i + 1), delay); + } + } + } + + function makeStream(count: number, delay: number): ReadableStream { + return new ReadableStream({ + start(controller) { + if (delay == 0) { + doAction(controller, 0); + } else { + setTimeout(() => doAction(controller, 0), delay); + } + }, + }).pipeThrough(new TextEncoderStream()); + } + + Deno.test(`httpServerStreamCount${count}Delay${delay}${action}`, async () => { const ac = new AbortController(); + const listeningPromise = deferred(); const server = Deno.serve({ - handler: (request) => { - assert(!request.body); - return new Response(stream.readable); + handler: async (request) => { + return new Response(makeStream(count, delay)); }, port: 4501, signal: ac.signal, @@ -556,12 +578,34 @@ Deno.test( await listeningPromise; const resp = await fetch("http://127.0.0.1:4501/"); - const respBody = await resp.text(); - assertEquals("hello world", respBody); + const text = await resp.text(); + ac.abort(); await server; - }, -); + let expected = ""; + if (action == "Throw" && count < 2 && delay < 1000) { + // NOTE: This is specific to the current implementation. In some cases where a stream errors, we + // don't send the first packet. + expected = ""; + } else { + for (let i = 0; i < count; i++) { + expected += `a${i}`; + } + } + + assertEquals(text, expected); + }); +} + +for (let count of [0, 1, 2, 3]) { + for (let delay of [0, 1, 1000]) { + // Creating a stream that errors in start will throw + if (delay > 0) { + createStreamTest(count, delay, "Throw"); + } + createStreamTest(count, delay, "Close"); + } +} Deno.test( { permissions: { net: true } }, @@ -1692,78 +1736,6 @@ createServerLengthTest("autoResponseWithUnknownLengthEmpty", { Deno.test( { permissions: { net: true } }, - async function httpServerGetChunkedResponseWithKa() { - const promises = [deferred(), deferred()]; - let reqCount = 0; - const listeningPromise = deferred(); - const ac = new AbortController(); - - const server = Deno.serve({ - handler: async (request) => { - assertEquals(request.method, "GET"); - promises[reqCount].resolve(); - reqCount++; - return new Response(reqCount <= 1 ? stream("foo bar baz") : "zar quux"); - }, - port: 4503, - signal: ac.signal, - onListen: onListen(listeningPromise), - onError: createOnErrorCb(ac), - }); - - await listeningPromise; - const conn = await Deno.connect({ port: 4503 }); - const encoder = new TextEncoder(); - { - const body = - `GET / HTTP/1.1\r\nHost: example.domain\r\nConnection: keep-alive\r\n\r\n`; - const writeResult = await conn.write(encoder.encode(body)); - assertEquals(body.length, writeResult); - await promises[0]; - } - - const decoder = new TextDecoder(); - { - let msg = ""; - while (true) { - try { - const buf = new Uint8Array(1024); - const readResult = await conn.read(buf); - assert(readResult); - msg += decoder.decode(buf.subarray(0, readResult)); - assert(msg.endsWith("\r\nfoo bar baz\r\n0\r\n\r\n")); - break; - } catch { - continue; - } - } - } - - // once more! - { - const body = - `GET /quux HTTP/1.1\r\nHost: example.domain\r\nConnection: close\r\n\r\n`; - const writeResult = await conn.write(encoder.encode(body)); - assertEquals(body.length, writeResult); - await promises[1]; - } - { - const buf = new Uint8Array(1024); - const readResult = await conn.read(buf); - assert(readResult); - const msg = decoder.decode(buf.subarray(0, readResult)); - assert(msg.endsWith("zar quux")); - } - - conn.close(); - - ac.abort(); - await server; - }, -); - -Deno.test( - { permissions: { net: true } }, async function httpServerPostWithContentLengthBody() { const promise = deferred(); const listeningPromise = deferred(); diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 1efa4cddb..56f250d1d 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -28,6 +28,7 @@ import { import { Deferred, getReadableStreamResourceBacking, + readableStreamClose, readableStreamForRid, ReadableStreamPrototype, } from "ext:deno_web/06_streams.js"; @@ -331,24 +332,97 @@ function fastSyncResponseOrStream(req, respBody) { } async function asyncResponse(responseBodies, req, status, stream) { - const responseRid = core.ops.op_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); const reader = stream.getReader(); - core.ops.op_set_promise_complete(req, status); + let responseRid; + let closed = false; + let timeout; + try { + // IMPORTANT: We get a performance boost from this optimization, but V8 is very + // sensitive to the order and structure. Benchmark any changes to this code. + + // Optimize for streams that are done in zero or one packets. We will not + // have to allocate a resource in this case. + const { value: value1, done: done1 } = await reader.read(); + if (done1) { + closed = true; + // Exit 1: no response body at all, extreme fast path + // Reader will be closed by finally block + return; + } + + // The second value cannot block indefinitely, as someone may be waiting on a response + // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms + // and we race it. + let timeoutPromise; + timeout = setTimeout(() => { + responseRid = core.ops.op_set_response_body_stream(req); + SetPrototypeAdd(responseBodies, responseRid); + core.ops.op_set_promise_complete(req, status); + timeoutPromise = core.writeAll(responseRid, value1); + }, 250); + const { value: value2, done: done2 } = await reader.read(); + + if (timeoutPromise) { + await timeoutPromise; + if (done2) { + closed = true; + // Exit 2(a): read 2 is EOS, and timeout resolved. + // Reader will be closed by finally block + // Response stream will be closed by finally block. + return; + } + + // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward. + } else { + clearTimeout(timeout); + timeout = undefined; + + if (done2) { + // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough. + // Reader will be closed by finally block + // No response stream + closed = true; + core.ops.op_set_response_body_bytes(req, value1); + return; + } + + responseRid = core.ops.op_set_response_body_stream(req); + SetPrototypeAdd(responseBodies, responseRid); + core.ops.op_set_promise_complete(req, status); + // Write our first packet + await core.writeAll(responseRid, value1); + } + + await core.writeAll(responseRid, value2); while (true) { const { value, done } = await reader.read(); if (done) { + closed = true; break; } await core.writeAll(responseRid, value); } } catch (error) { - await reader.cancel(error); + closed = true; + try { + await reader.cancel(error); + } catch { + // Pass + } } finally { - core.tryClose(responseRid); - SetPrototypeDelete(responseBodies, responseRid); - reader.releaseLock(); + if (!closed) { + readableStreamClose(reader); + } + if (timeout !== undefined) { + clearTimeout(timeout); + } + if (responseRid) { + core.tryClose(responseRid); + SetPrototypeDelete(responseBodies, responseRid); + } else { + core.ops.op_set_promise_complete(req, status); + } } } |