summaryrefslogtreecommitdiff
path: root/ext/http/service.rs
diff options
context:
space:
mode:
authorhaturau <135221985+haturatu@users.noreply.github.com>2024-11-20 01:20:47 +0900
committerGitHub <noreply@github.com>2024-11-20 01:20:47 +0900
commit85719a67e59c7aa45bead26e4942d7df8b1b42d4 (patch)
treeface0aecaac53e93ce2f23b53c48859bcf1a36ec /ext/http/service.rs
parent67697bc2e4a62a9670699fd18ad0dd8efc5bd955 (diff)
parent186b52731c6bb326c4d32905c5e732d082e83465 (diff)
Merge branch 'denoland:main' into main
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r--ext/http/service.rs23
1 files changed, 18 insertions, 5 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs
index 787e9babf..ce24dea43 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -2,7 +2,6 @@
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::ResponseBytesInner;
use crate::response_body::ResponseStreamResult;
-use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::BufView;
use deno_core::OpState;
@@ -28,6 +27,7 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
+use tokio::sync::oneshot;
pub type Request = hyper::Request<Incoming>;
pub type Response = hyper::Response<HttpRecordResponse>;
@@ -206,8 +206,13 @@ pub(crate) async fn handle_request(
Ok(response)
}
+#[derive(Debug, thiserror::Error)]
+#[error("upgrade unavailable")]
+pub struct UpgradeUnavailableError;
+
struct HttpRecordInner {
server_state: SignallingRc<HttpServerState>,
+ closed_channel: Option<oneshot::Sender<()>>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
@@ -273,6 +278,7 @@ impl HttpRecord {
response_body_finished: false,
response_body_waker: None,
trailers: None,
+ closed_channel: None,
been_dropped: false,
finished: false,
needs_close_after_finish: false,
@@ -309,6 +315,10 @@ impl HttpRecord {
RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
}
+ pub fn on_cancel(&self, sender: oneshot::Sender<()>) {
+ self.self_mut().closed_channel = Some(sender);
+ }
+
fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
@@ -344,14 +354,14 @@ impl HttpRecord {
}
/// Perform the Hyper upgrade on this record.
- pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> {
+ pub fn upgrade(&self) -> Result<OnUpgrade, UpgradeUnavailableError> {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
self
.self_mut()
.request_parts
.extensions
.remove::<OnUpgrade>()
- .ok_or_else(|| AnyError::msg("upgrade unavailable"))
+ .ok_or(UpgradeUnavailableError)
}
/// Take the Hyper body from this record.
@@ -387,6 +397,9 @@ impl HttpRecord {
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
+ if let Some(closed_channel) = inner.closed_channel.take() {
+ let _ = closed_channel.send(());
+ }
}
/// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).
@@ -515,7 +528,7 @@ pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>);
impl Body for HttpRecordResponse {
type Data = BufView;
- type Error = AnyError;
+ type Error = deno_core::error::AnyError;
fn poll_frame(
self: Pin<&mut Self>,
@@ -640,7 +653,7 @@ mod tests {
}
#[tokio::test]
- async fn test_handle_request() -> Result<(), AnyError> {
+ async fn test_handle_request() -> Result<(), deno_core::error::AnyError> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let server_state = HttpServerState::new();
let server_state_check = server_state.clone();