summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/serve_test.ts50
-rw-r--r--ext/http/http_next.rs11
-rw-r--r--ext/http/response_body.rs34
-rw-r--r--ext/web/stream_resource.rs18
4 files changed, 106 insertions, 7 deletions
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index 2e7836b6a..830817146 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -2699,12 +2699,14 @@ Deno.test(
for (const url of ["text", "file", "stream"]) {
// Ensure that we don't panic when the incoming TCP request was dropped
- // https://github.com/denoland/deno/issues/20315
+ // https://github.com/denoland/deno/issues/20315 and that we correctly
+ // close/cancel the response
Deno.test({
permissions: { read: true, write: true, net: true },
name: `httpServerTcpCancellation_${url}`,
fn: async function () {
const ac = new AbortController();
+ const streamCancelled = url == "stream" ? deferred() : undefined;
const listeningPromise = deferred();
const waitForAbort = deferred();
const waitForRequest = deferred();
@@ -2727,7 +2729,9 @@ for (const url of ["text", "file", "stream"]) {
start(controller) {
_body = null;
controller.enqueue(new Uint8Array([1]));
- controller.close();
+ },
+ cancel(reason) {
+ streamCancelled!.resolve(reason);
},
}),
);
@@ -2753,15 +2757,57 @@ for (const url of ["text", "file", "stream"]) {
// Give it a few milliseconds for the serve machinery to work
await new Promise((r) => setTimeout(r, 10));
+ // Wait for cancellation before we shut the server down
+ if (streamCancelled !== undefined) {
+ await streamCancelled;
+ }
+
// Since the handler has a chance of creating resources or running async ops, we need to use a
// graceful shutdown here to ensure they have fully drained.
await server.shutdown();
+
await server.finished;
},
});
}
Deno.test(
+ { permissions: { net: true } },
+ async function httpServerCancelFetch() {
+ const request2 = deferred();
+ const request2Aborted = deferred();
+ const { finished, abort } = await makeServer(async (req) => {
+ if (req.url.endsWith("/1")) {
+ const fetchRecursive = await fetch(`http://localhost:${servePort}/2`);
+ return new Response(fetchRecursive.body);
+ } else if (req.url.endsWith("/2")) {
+ request2.resolve();
+ return new Response(
+ new ReadableStream({
+ start(_controller) {/* just hang */},
+ cancel(reason) {
+ request2Aborted.resolve(reason);
+ },
+ }),
+ );
+ }
+ fail();
+ });
+ const fetchAbort = new AbortController();
+ const fetchPromise = await fetch(`http://localhost:${servePort}/1`, {
+ signal: fetchAbort.signal,
+ });
+ await fetchPromise;
+ await request2;
+ fetchAbort.abort();
+ assertEquals("resource closed", await request2Aborted);
+
+ abort();
+ await finished;
+ },
+);
+
+Deno.test(
{ permissions: { read: true, net: true } },
async function httpServerWithTls() {
const ac = new AbortController();
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 21e138f86..7ccd9ec81 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -577,10 +577,13 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
);
}
+/// Sets the appropriate response body. Use `force_instantiate_body` if you need
+/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
slab_id: SlabId,
length: Option<usize>,
status: u16,
+ force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
let mut http = slab_get(slab_id);
@@ -602,7 +605,10 @@ fn set_response(
if let Ok(code) = StatusCode::from_u16(status) {
*response.status_mut() = code;
}
+ } else if force_instantiate_body {
+ response_fn(Compression::None).abort();
}
+
http.complete();
}
@@ -634,6 +640,7 @@ pub fn op_http_set_response_body_resource(
slab_id,
resource.size_hint().1.map(|s| s as usize),
status,
+ true,
move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close)
},
@@ -649,7 +656,7 @@ pub fn op_http_set_response_body_text(
status: u16,
) {
if !text.is_empty() {
- set_response(slab_id, Some(text.len()), status, |compression| {
+ set_response(slab_id, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
@@ -665,7 +672,7 @@ pub fn op_http_set_response_body_bytes(
status: u16,
) {
if !buffer.is_empty() {
- set_response(slab_id, Some(buffer.len()), status, |compression| {
+ set_response(slab_id, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 5c946a4d3..4f7e3b0a5 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -125,6 +125,16 @@ pub enum ResponseStream {
TestChannel(tokio::sync::mpsc::Receiver<BufView>),
}
+impl ResponseStream {
+ pub fn abort(self) {
+ match self {
+ ResponseStream::Resource(resource) => resource.stm.close(),
+ #[cfg(test)]
+ ResponseStream::TestChannel(..) => {}
+ }
+ }
+}
+
#[derive(Default)]
pub enum ResponseBytesInner {
/// An empty stream.
@@ -192,11 +202,25 @@ impl ResponseBytes {
let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
self.completion_handle.complete(success);
- current
+ if success {
+ current
+ } else {
+ current.abort();
+ ResponseBytesInner::Done
+ }
}
}
impl ResponseBytesInner {
+ pub fn abort(self) {
+ match self {
+ Self::Done | Self::Empty | Self::Bytes(..) => {}
+ Self::BrotliStream(stm) => stm.abort(),
+ Self::GZipStream(stm) => stm.abort(),
+ Self::UncompressedStream(stm) => stm.abort(),
+ }
+ }
+
pub fn size_hint(&self) -> SizeHint {
match self {
Self::Done => SizeHint::with_exact(0),
@@ -463,6 +487,10 @@ impl GZipResponseStream {
underlying,
}
}
+
+ pub fn abort(self) {
+ self.underlying.abort()
+ }
}
/// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide
@@ -645,6 +673,10 @@ impl BrotliResponseStream {
underlying,
}
}
+
+ pub fn abort(self) {
+ self.underlying.abort()
+ }
}
fn max_compressed_size(input_size: usize) -> usize {
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index e19954fdc..b35d4c302 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -364,6 +364,15 @@ impl ReadableStreamResource {
.read(limit)
.map(|buf| buf.unwrap_or_else(BufView::empty))
}
+
+ fn close_channel(&self) {
+ // Trigger the promise in JS to cancel the stream if necessarily
+ self.data.completion.complete(true);
+ // Cancel any outstanding read requests
+ self.cancel_handle.cancel();
+ // Close the channel to wake up anyone waiting
+ self.channel.close();
+ }
}
impl Resource for ReadableStreamResource {
@@ -376,8 +385,13 @@ impl Resource for ReadableStreamResource {
}
fn close(self: Rc<Self>) {
- self.cancel_handle.cancel();
- self.channel.close();
+ self.close_channel();
+ }
+}
+
+impl Drop for ReadableStreamResource {
+ fn drop(&mut self) {
+ self.close_channel();
}
}