diff options
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r-- | ext/http/service.rs | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs index 787e9babf..ce24dea43 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -2,7 +2,6 @@ use crate::request_properties::HttpConnectionProperties; use crate::response_body::ResponseBytesInner; use crate::response_body::ResponseStreamResult; -use deno_core::error::AnyError; use deno_core::futures::ready; use deno_core::BufView; use deno_core::OpState; @@ -28,6 +27,7 @@ use std::rc::Rc; use std::task::Context; use std::task::Poll; use std::task::Waker; +use tokio::sync::oneshot; pub type Request = hyper::Request<Incoming>; pub type Response = hyper::Response<HttpRecordResponse>; @@ -206,8 +206,13 @@ pub(crate) async fn handle_request( Ok(response) } +#[derive(Debug, thiserror::Error)] +#[error("upgrade unavailable")] +pub struct UpgradeUnavailableError; + struct HttpRecordInner { server_state: SignallingRc<HttpServerState>, + closed_channel: Option<oneshot::Sender<()>>, request_info: HttpConnectionProperties, request_parts: http::request::Parts, request_body: Option<RequestBodyState>, @@ -273,6 +278,7 @@ impl HttpRecord { response_body_finished: false, response_body_waker: None, trailers: None, + closed_channel: None, been_dropped: false, finished: false, needs_close_after_finish: false, @@ -309,6 +315,10 @@ impl HttpRecord { RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) } + pub fn on_cancel(&self, sender: oneshot::Sender<()>) { + self.self_mut().closed_channel = Some(sender); + } + fn recycle(self: Rc<Self>) { assert!( Rc::strong_count(&self) == 1, @@ -344,14 +354,14 @@ impl HttpRecord { } /// Perform the Hyper upgrade on this record. - pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> { + pub fn upgrade(&self) -> Result<OnUpgrade, UpgradeUnavailableError> { // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit self .self_mut() .request_parts .extensions .remove::<OnUpgrade>() - .ok_or_else(|| AnyError::msg("upgrade unavailable")) + .ok_or(UpgradeUnavailableError) } /// Take the Hyper body from this record. @@ -387,6 +397,9 @@ impl HttpRecord { inner.been_dropped = true; // The request body might include actual resources. inner.request_body.take(); + if let Some(closed_channel) = inner.closed_channel.take() { + let _ = closed_channel.send(()); + } } /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well). @@ -515,7 +528,7 @@ pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>); impl Body for HttpRecordResponse { type Data = BufView; - type Error = AnyError; + type Error = deno_core::error::AnyError; fn poll_frame( self: Pin<&mut Self>, @@ -640,7 +653,7 @@ mod tests { } #[tokio::test] - async fn test_handle_request() -> Result<(), AnyError> { + async fn test_handle_request() -> Result<(), deno_core::error::AnyError> { let (tx, mut rx) = tokio::sync::mpsc::channel(10); let server_state = HttpServerState::new(); let server_state_check = server_state.clone(); |