diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/fetch/23_request.js | 8 | ||||
-rw-r--r-- | ext/http/00_serve.ts | 13 | ||||
-rw-r--r-- | ext/http/http_next.rs | 13 | ||||
-rw-r--r-- | ext/http/lib.rs | 1 | ||||
-rw-r--r-- | ext/http/service.rs | 10 |
5 files changed, 37 insertions, 8 deletions
diff --git a/ext/fetch/23_request.js b/ext/fetch/23_request.js index 22c17d6d2..61cac22d2 100644 --- a/ext/fetch/23_request.js +++ b/ext/fetch/23_request.js @@ -281,11 +281,11 @@ class Request { if (signal === undefined) { const signal = newSignal(); this[_signalCache] = signal; - return signal; - } + this[_request].onCancel?.(() => { + signal[signalAbort](signalAbortError); + }); - if (!signal.aborted && this[_request].isCancelled) { - signal[signalAbort](signalAbortError); + return signal; } return signal; diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 8cfd7ad53..7bf83e49c 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -11,10 +11,10 @@ import { op_http_cancel, op_http_close, op_http_close_after_finish, - op_http_get_request_cancelled, op_http_get_request_headers, op_http_get_request_method_and_url, op_http_read_request_body, + op_http_request_on_cancel, op_http_serve, op_http_serve_on, op_http_set_promise_complete, @@ -375,11 +375,16 @@ class InnerRequest { return this.#external; } - get isCancelled() { + onCancel(callback) { if (this.#external === null) { - return true; + callback(); + return; } - return op_http_get_request_cancelled(this.#external); + + PromisePrototypeThen( + op_http_request_on_cancel(this.#external), + callback, + ); } } diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 326478fe7..c55e86835 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -708,6 +708,19 @@ pub fn op_http_get_request_cancelled(external: *const c_void) -> bool { http.cancelled() } +#[op2(async)] +pub async fn op_http_request_on_cancel(external: *const c_void) { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_request_on_cancel") }; + let (tx, rx) = tokio::sync::oneshot::channel(); + + http.on_cancel(tx); + drop(http); + + rx.await.ok(); +} + /// Returned promise resolves when body streaming finishes. /// Call [`op_http_close_after_finish`] when done with the external. #[op2(async)] diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9d71e3ad3..49893b1b9 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -112,6 +112,7 @@ deno_core::extension!( http_next::op_http_close_after_finish, http_next::op_http_get_request_header, http_next::op_http_get_request_headers, + http_next::op_http_request_on_cancel, http_next::op_http_get_request_method_and_url<HTTP>, http_next::op_http_get_request_cancelled, http_next::op_http_read_request_body, diff --git a/ext/http/service.rs b/ext/http/service.rs index 75f93d77c..ce24dea43 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -27,6 +27,7 @@ use std::rc::Rc; use std::task::Context; use std::task::Poll; use std::task::Waker; +use tokio::sync::oneshot; pub type Request = hyper::Request<Incoming>; pub type Response = hyper::Response<HttpRecordResponse>; @@ -211,6 +212,7 @@ pub struct UpgradeUnavailableError; struct HttpRecordInner { server_state: SignallingRc<HttpServerState>, + closed_channel: Option<oneshot::Sender<()>>, request_info: HttpConnectionProperties, request_parts: http::request::Parts, request_body: Option<RequestBodyState>, @@ -276,6 +278,7 @@ impl HttpRecord { response_body_finished: false, response_body_waker: None, trailers: None, + closed_channel: None, been_dropped: false, finished: false, needs_close_after_finish: false, @@ -312,6 +315,10 @@ impl HttpRecord { RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) } + pub fn on_cancel(&self, sender: oneshot::Sender<()>) { + self.self_mut().closed_channel = Some(sender); + } + fn recycle(self: Rc<Self>) { assert!( Rc::strong_count(&self) == 1, @@ -390,6 +397,9 @@ impl HttpRecord { inner.been_dropped = true; // The request body might include actual resources. inner.request_body.take(); + if let Some(closed_channel) = inner.closed_channel.take() { + let _ = closed_channel.send(()); + } } /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well). |