summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/serve_test.ts62
-rw-r--r--ext/http/http_next.rs55
-rw-r--r--ext/http/slab.rs8
3 files changed, 103 insertions, 22 deletions
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index 05affc4f8..d1ac82696 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -2403,6 +2403,68 @@ Deno.test(
},
);
+for (const url of ["text", "file", "stream"]) {
+ // Ensure that we don't panic when the incoming TCP request was dropped
+ // https://github.com/denoland/deno/issues/20315
+ Deno.test({
+ permissions: { read: true, write: true, net: true },
+ name: `httpServerTcpCancellation_${url}`,
+ fn: async function () {
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+ const waitForAbort = deferred();
+ const waitForRequest = deferred();
+ const server = Deno.serve({
+ port: servePort,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ handler: async (req: Request) => {
+ waitForRequest.resolve();
+ await waitForAbort;
+ // Allocate the request body
+ let _body = req.body;
+ if (req.url.includes("/text")) {
+ return new Response("text");
+ } else if (req.url.includes("/file")) {
+ return new Response((await makeTempFile(1024)).readable);
+ } else if (req.url.includes("/stream")) {
+ return new Response(
+ new ReadableStream({
+ start(controller) {
+ _body = null;
+ controller.enqueue(new Uint8Array([1]));
+ controller.close();
+ },
+ }),
+ );
+ } else {
+ fail();
+ }
+ },
+ });
+
+ await listeningPromise;
+
+ // Create a POST request and drop it once the server has received it
+ const conn = await Deno.connect({ port: servePort });
+ const writer = conn.writable.getWriter();
+ writer.write(new TextEncoder().encode(`POST /${url} HTTP/1.0\n\n`));
+ await waitForRequest;
+ writer.close();
+
+ // Give it a few milliseconds for the serve machinery to work
+ await new Promise((r) => setTimeout(r, 10));
+ waitForAbort.resolve();
+
+ // Give it a few milliseconds for the serve machinery to work
+ await new Promise((r) => setTimeout(r, 10));
+
+ ac.abort();
+ await server.finished;
+ },
+ });
+}
+
Deno.test(
{ permissions: { read: true, net: true } },
async function httpServerWithTls() {
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index e8517c901..476a55a80 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -585,39 +585,52 @@ fn set_response(
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
let mut http = slab_get(slab_id);
- let resource = http.take_resource();
- let compression = is_request_compressible(&http.request_parts().headers);
- let response = http.response();
- let compression = modify_compressibility_from_response(
- compression,
- length,
- response.headers_mut(),
- );
- response
- .body_mut()
- .initialize(response_fn(compression), resource);
-
- // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
- // will quitely ignore invalid values.
- if let Ok(code) = StatusCode::from_u16(status) {
- *response.status_mut() = code;
+ // The request may have been cancelled by this point and if so, there's no need for us to
+ // do all of this work to send the response.
+ if !http.cancelled() {
+ let resource = http.take_resource();
+ let compression = is_request_compressible(&http.request_parts().headers);
+ let response = http.response();
+ let compression = modify_compressibility_from_response(
+ compression,
+ length,
+ response.headers_mut(),
+ );
+ response
+ .body_mut()
+ .initialize(response_fn(compression), resource);
+
+ // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
+ // will quitely ignore invalid values.
+ if let Ok(code) = StatusCode::from_u16(status) {
+ *response.status_mut() = code;
+ }
}
http.complete();
}
#[op2(fast)]
pub fn op_http_set_response_body_resource(
- state: &mut OpState,
+ state: Rc<RefCell<OpState>>,
#[smi] slab_id: SlabId,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
+ // IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request
+ // body resource so we _cannot_ hold the OpState lock longer than necessary.
+
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
- let resource = if auto_close {
- state.resource_table.take_any(stream_rid)?
- } else {
- state.resource_table.get_any(stream_rid)?
+ // TODO(mmastrac): We should be using the same auto-close functionality rather than removing autoclose resources.
+ // It's possible things could fail elsewhere if code expects the rid to continue existing after the response has been
+ // returned.
+ let resource = {
+ let mut state = state.borrow_mut();
+ if auto_close {
+ state.resource_table.take_any(stream_rid)?
+ } else {
+ state.resource_table.get_any(stream_rid)?
+ }
};
set_response(
diff --git a/ext/http/slab.rs b/ext/http/slab.rs
index 8c285c860..8dd562cc2 100644
--- a/ext/http/slab.rs
+++ b/ext/http/slab.rs
@@ -234,7 +234,7 @@ impl SlabEntry {
self.self_mut().request_body = Some(RequestBodyState::Resource(res));
}
- /// Complete this entry, potentially expunging it if it is complete.
+ /// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well).
pub fn complete(self) {
let promise = &self.self_ref().promise;
assert!(
@@ -251,6 +251,12 @@ impl SlabEntry {
}
}
+ /// Has the future for this entry been dropped? ie, has the underlying TCP connection
+ /// been closed?
+ pub fn cancelled(&self) -> bool {
+ self.self_ref().been_dropped
+ }
+
/// Get a mutable reference to the response.
pub fn response(&mut self) -> &mut Response {
self.self_mut().response.as_mut().unwrap()