summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2024-11-08 18:46:11 +0530
committerGitHub <noreply@github.com>2024-11-08 18:46:11 +0530
commitb482a50299ae4f636a186038460e54af65e2b627 (patch)
tree0d9cb0f2348391b438efa133afd391b189ec0e30 /ext
parent637b1d5508293ed02bef2f317b30bb7c1f0cbc71 (diff)
feat(ext/http): abort event when request is cancelled (#26781)
```js Deno.serve(async (req) => { const { promise, resolve } = Promise.withResolvers<void>(); req.signal.addEventListener("abort", () => { resolve(); }); await promise; return new Response("Ok"); }); ```
Diffstat (limited to 'ext')
-rw-r--r--ext/fetch/23_request.js8
-rw-r--r--ext/http/00_serve.ts13
-rw-r--r--ext/http/http_next.rs13
-rw-r--r--ext/http/lib.rs1
-rw-r--r--ext/http/service.rs10
5 files changed, 37 insertions, 8 deletions
diff --git a/ext/fetch/23_request.js b/ext/fetch/23_request.js
index 22c17d6d2..61cac22d2 100644
--- a/ext/fetch/23_request.js
+++ b/ext/fetch/23_request.js
@@ -281,11 +281,11 @@ class Request {
if (signal === undefined) {
const signal = newSignal();
this[_signalCache] = signal;
- return signal;
- }
+ this[_request].onCancel?.(() => {
+ signal[signalAbort](signalAbortError);
+ });
- if (!signal.aborted && this[_request].isCancelled) {
- signal[signalAbort](signalAbortError);
+ return signal;
}
return signal;
diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts
index 8cfd7ad53..7bf83e49c 100644
--- a/ext/http/00_serve.ts
+++ b/ext/http/00_serve.ts
@@ -11,10 +11,10 @@ import {
op_http_cancel,
op_http_close,
op_http_close_after_finish,
- op_http_get_request_cancelled,
op_http_get_request_headers,
op_http_get_request_method_and_url,
op_http_read_request_body,
+ op_http_request_on_cancel,
op_http_serve,
op_http_serve_on,
op_http_set_promise_complete,
@@ -375,11 +375,16 @@ class InnerRequest {
return this.#external;
}
- get isCancelled() {
+ onCancel(callback) {
if (this.#external === null) {
- return true;
+ callback();
+ return;
}
- return op_http_get_request_cancelled(this.#external);
+
+ PromisePrototypeThen(
+ op_http_request_on_cancel(this.#external),
+ callback,
+ );
}
}
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 326478fe7..c55e86835 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -708,6 +708,19 @@ pub fn op_http_get_request_cancelled(external: *const c_void) -> bool {
http.cancelled()
}
+#[op2(async)]
+pub async fn op_http_request_on_cancel(external: *const c_void) {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_request_on_cancel") };
+ let (tx, rx) = tokio::sync::oneshot::channel();
+
+ http.on_cancel(tx);
+ drop(http);
+
+ rx.await.ok();
+}
+
/// Returned promise resolves when body streaming finishes.
/// Call [`op_http_close_after_finish`] when done with the external.
#[op2(async)]
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 9d71e3ad3..49893b1b9 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -112,6 +112,7 @@ deno_core::extension!(
http_next::op_http_close_after_finish,
http_next::op_http_get_request_header,
http_next::op_http_get_request_headers,
+ http_next::op_http_request_on_cancel,
http_next::op_http_get_request_method_and_url<HTTP>,
http_next::op_http_get_request_cancelled,
http_next::op_http_read_request_body,
diff --git a/ext/http/service.rs b/ext/http/service.rs
index 75f93d77c..ce24dea43 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -27,6 +27,7 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
+use tokio::sync::oneshot;
pub type Request = hyper::Request<Incoming>;
pub type Response = hyper::Response<HttpRecordResponse>;
@@ -211,6 +212,7 @@ pub struct UpgradeUnavailableError;
struct HttpRecordInner {
server_state: SignallingRc<HttpServerState>,
+ closed_channel: Option<oneshot::Sender<()>>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
@@ -276,6 +278,7 @@ impl HttpRecord {
response_body_finished: false,
response_body_waker: None,
trailers: None,
+ closed_channel: None,
been_dropped: false,
finished: false,
needs_close_after_finish: false,
@@ -312,6 +315,10 @@ impl HttpRecord {
RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
}
+ pub fn on_cancel(&self, sender: oneshot::Sender<()>) {
+ self.self_mut().closed_channel = Some(sender);
+ }
+
fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
@@ -390,6 +397,9 @@ impl HttpRecord {
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
+ if let Some(closed_channel) = inner.closed_channel.take() {
+ let _ = closed_channel.send(());
+ }
}
/// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).