summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-06-14 22:10:55 +0200
committerGitHub <noreply@github.com>2021-06-14 22:10:55 +0200
commit1246a433f8101c03491c1f1e9e0d51d79a025956 (patch)
tree10f96ad60bfef424f32f88388f31b5e1a240d2fc
parentf48d66b2b01fef0f16beb35a66f1b4d5771e3b6e (diff)
fix: poll connection after writing response chunk in Deno.serveHttp() (#10961)
This commit changes "op_http_response_write" to first send response chunk and then poll the underlying HTTP connection. Previously after writing a chunk of response HTTP connection wasn't polled and thus data wasn't written to the socket until after next op interacting with the connection.
-rw-r--r--cli/tests/unit/http_test.ts91
-rw-r--r--runtime/ops/http.rs5
2 files changed, 95 insertions, 1 deletions
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index 4a362a479..d84a4bdd2 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -7,6 +7,7 @@ import {
assert,
assertEquals,
assertThrowsAsync,
+ deferred,
unitTest,
} from "./test_util.ts";
@@ -450,3 +451,93 @@ unitTest(
}
},
);
+
+unitTest(
+ { perms: { net: true } },
+ // Issue: https://github.com/denoland/deno/issues/10930
+ async function httpServerStreamingResponse() {
+ // This test enqueues a single chunk for readable
+ // stream and waits for client to read that chunk and signal
+ // it before enqueueing subsequent chunk. Issue linked above
+ // presented a situation where enqueued chunks were not
+ // written to the HTTP connection until the next chunk was enqueued.
+
+ let counter = 0;
+
+ const deferreds = [
+ deferred(),
+ deferred(),
+ deferred(),
+ ];
+
+ async function writeRequest(conn: Deno.Conn) {
+ const encoder = new TextEncoder();
+ const decoder = new TextDecoder();
+
+ const w = new BufWriter(conn);
+ const r = new BufReader(conn);
+ const body = `GET / HTTP/1.1\r\nHost: 127.0.0.1:4501\r\n\r\n`;
+ const writeResult = await w.write(encoder.encode(body));
+ assertEquals(body.length, writeResult);
+ await w.flush();
+ const tpr = new TextProtoReader(r);
+ const statusLine = await tpr.readLine();
+ assert(statusLine !== null);
+ const headers = await tpr.readMIMEHeader();
+ assert(headers !== null);
+
+ const chunkedReader = chunkedBodyReader(headers, r);
+ const buf = new Uint8Array(5);
+ const dest = new Buffer();
+ let result: number | null;
+ while ((result = await chunkedReader.read(buf)) !== null) {
+ const len = Math.min(buf.byteLength, result);
+ await dest.write(buf.subarray(0, len));
+ // Resolve a deferred - this will make response stream to
+ // enqueue next chunk.
+ deferreds[counter - 1].resolve();
+ }
+ return decoder.decode(dest.bytes());
+ }
+
+ function periodicStream() {
+ return new ReadableStream({
+ start(controller) {
+ controller.enqueue(`${counter}\n`);
+ counter++;
+ },
+
+ async pull(controller) {
+ if (counter >= 3) {
+ return controller.close();
+ }
+
+ await deferreds[counter - 1];
+
+ controller.enqueue(`${counter}\n`);
+ counter++;
+ },
+ }).pipeThrough(new TextEncoderStream());
+ }
+
+ const listener = Deno.listen({ port: 4501 });
+ const finished = (async () => {
+ const conn = await listener.accept();
+ const httpConn = Deno.serveHttp(conn);
+ const requestEvent = await httpConn.nextRequest();
+ const { respondWith } = requestEvent!;
+ await respondWith(new Response(periodicStream()));
+ httpConn.close();
+ })();
+
+ // start a client
+ const clientConn = await Deno.connect({ port: 4501 });
+
+ const r1 = await writeRequest(clientConn);
+ assertEquals(r1, "0\n1\n2\n");
+
+ await finished;
+ clientConn.close();
+ listener.close();
+ },
+);
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
index 11e83f6c7..3e8a4ada7 100644
--- a/runtime/ops/http.rs
+++ b/runtime/ops/http.rs
@@ -502,6 +502,9 @@ async fn op_http_response_write(
let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
poll_fn(|cx| {
+ let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
+
+ // Poll connection so the data is flushed
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
// close ConnResource
// close RequestResource associated with connection
@@ -509,7 +512,7 @@ async fn op_http_response_write(
return Poll::Ready(Err(e));
}
- send_data_fut.poll_unpin(cx).map_err(AnyError::from)
+ r
})
.await?;