summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-06-14 14:52:49 +0200
committerGitHub <noreply@github.com>2021-06-14 14:52:49 +0200
commit1e1959f6fac6dd0e499532772c8143285cdd81de (patch)
treea321b9351d1d860b4e6026896cc45966cae96c8d
parent5814315b708d154ebb2c29810c16e5af7e726741 (diff)
fix: hang in Deno.serveHttp() (#10923)
Waiting on next request in Deno.serveHttp() API hanged when responses were using ReadableStream. This was caused by op_http_request_next op that was never woken after response was fully written. This commit adds waker field to DenoService which is called after response is finished.
-rw-r--r--Cargo.lock6
-rw-r--r--cli/tests/unit/http_test.ts77
-rw-r--r--runtime/Cargo.toml2
-rw-r--r--runtime/ops/http.rs20
4 files changed, 94 insertions, 11 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 022a609b8..7bd3314b0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1636,9 +1636,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
-version = "0.14.7"
+version = "0.14.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e5f105c494081baa3bf9e200b279e27ec1623895cd504c7dbef8d0b080fcf54"
+checksum = "07d6baa1b441335f3ce5098ac421fb6547c46dda735ca1bc6d0153c838f9dd83"
dependencies = [
"bytes",
"futures-channel",
@@ -1650,7 +1650,7 @@ dependencies = [
"httparse",
"httpdate",
"itoa",
- "pin-project",
+ "pin-project-lite",
"socket2 0.4.0",
"tokio",
"tower-service",
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index df599c6f4..4a362a479 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -1,4 +1,8 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+import { chunkedBodyReader } from "../../../test_util/std/http/_io.ts";
+import { BufReader, BufWriter } from "../../../test_util/std/io/bufio.ts";
+import { Buffer } from "../../../test_util/std/io/buffer.ts";
+import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts";
import {
assert,
assertEquals,
@@ -6,6 +10,33 @@ import {
unitTest,
} from "./test_util.ts";
+async function writeRequestAndReadResponse(conn: Deno.Conn): Promise<string> {
+ const encoder = new TextEncoder();
+ const decoder = new TextDecoder();
+
+ const w = new BufWriter(conn);
+ const r = new BufReader(conn);
+ const body = `GET / HTTP/1.1\r\nHost: 127.0.0.1:4501\r\n\r\n`;
+ const writeResult = await w.write(encoder.encode(body));
+ assertEquals(body.length, writeResult);
+ await w.flush();
+ const tpr = new TextProtoReader(r);
+ const statusLine = await tpr.readLine();
+ assert(statusLine !== null);
+ const headers = await tpr.readMIMEHeader();
+ assert(headers !== null);
+
+ const chunkedReader = chunkedBodyReader(headers, r);
+ const buf = new Uint8Array(5);
+ const dest = new Buffer();
+ let result: number | null;
+ while ((result = await chunkedReader.read(buf)) !== null) {
+ const len = Math.min(buf.byteLength, result);
+ await dest.write(buf.subarray(0, len));
+ }
+ return decoder.decode(dest.bytes());
+}
+
unitTest({ perms: { net: true } }, async function httpServerBasic() {
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
@@ -373,3 +404,49 @@ unitTest(
await delay(300);
},
);
+
+unitTest(
+ { perms: { net: true } },
+ // Issue: https://github.com/denoland/deno/issues/10870
+ async function httpServerHang() {
+ // Quick and dirty way to make a readable stream from a string. Alternatively,
+ // `readableStreamFromReader(file)` could be used.
+ function stream(s: string): ReadableStream<Uint8Array> {
+ return new Response(s).body!;
+ }
+
+ const httpConns: Deno.HttpConn[] = [];
+ const promise = (async () => {
+ let count = 0;
+ const listener = Deno.listen({ port: 4501 });
+ for await (const conn of listener) {
+ (async () => {
+ const httpConn = Deno.serveHttp(conn);
+ httpConns.push(httpConn);
+ for await (const { respondWith } of httpConn) {
+ respondWith(new Response(stream("hello")));
+
+ count++;
+ if (count >= 2) {
+ listener.close();
+ }
+ }
+ })();
+ }
+ })();
+
+ const clientConn = await Deno.connect({ port: 4501 });
+
+ const r1 = await writeRequestAndReadResponse(clientConn);
+ assertEquals(r1, "hello");
+
+ const r2 = await writeRequestAndReadResponse(clientConn);
+ assertEquals(r2, "hello");
+
+ clientConn.close();
+ await promise;
+ for (const conn of httpConns) {
+ conn.close();
+ }
+ },
+);
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 0bd8d13c4..356fb4694 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -55,7 +55,7 @@ dlopen = "0.1.8"
encoding_rs = "0.8.28"
filetime = "0.2.14"
http = "0.2.3"
-hyper = { version = "0.14.5", features = ["server", "stream", "http1", "http2", "runtime"] }
+hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] }
indexmap = "1.6.2"
lazy_static = "1.4.0"
libc = "0.2.93"
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
index fedcb404f..11e83f6c7 100644
--- a/runtime/ops/http.rs
+++ b/runtime/ops/http.rs
@@ -66,6 +66,7 @@ struct ServiceInner {
#[derive(Clone, Default)]
struct Service {
inner: Rc<RefCell<Option<ServiceInner>>>,
+ waker: Rc<deno_core::futures::task::AtomicWaker>,
}
impl HyperService<Request<Body>> for Service {
@@ -160,15 +161,16 @@ async fn op_http_request_next(
let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
poll_fn(|cx| {
+ conn_resource.deno_service.waker.register(cx.waker());
let connection_closed = match conn_resource.poll(cx) {
Poll::Pending => false,
Poll::Ready(Ok(())) => {
- // close ConnResource
- state
+ // try to close ConnResource, but don't unwrap as it might
+ // already be closed
+ let _ = state
.borrow_mut()
.resource_table
- .take::<ConnResource>(conn_rid)
- .unwrap();
+ .take::<ConnResource>(conn_rid);
true
}
Poll::Ready(Err(e)) => {
@@ -188,7 +190,6 @@ async fn op_http_request_next(
}
}
};
-
if let Some(request_resource) =
conn_resource.deno_service.inner.borrow_mut().take()
{
@@ -409,6 +410,9 @@ async fn op_http_response(
})
.await?;
+ if maybe_response_body_rid.is_none() {
+ conn_resource.deno_service.waker.wake();
+ }
Ok(maybe_response_body_rid)
}
@@ -430,11 +434,13 @@ async fn op_http_response_close(
.ok_or_else(bad_resource_id)?;
drop(resource);
- poll_fn(|cx| match conn_resource.poll(cx) {
+ let r = poll_fn(|cx| match conn_resource.poll(cx) {
Poll::Ready(x) => Poll::Ready(x),
Poll::Pending => Poll::Ready(Ok(())),
})
- .await
+ .await;
+ conn_resource.deno_service.waker.wake();
+ r
}
async fn op_http_request_read(