summaryrefslogtreecommitdiff
path: root/ext/http/response_body.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-09-25 09:23:55 -0600
committerGitHub <noreply@github.com>2023-09-25 17:23:55 +0200
commita27ee8f368dbac33141fdcb9a17d0e4ea907b8ef (patch)
tree4bc1bfa1e23b40bfb726cd5b179091393533dec2 /ext/http/response_body.rs
parent83f20007aac0f9ebb0eb59b71a932e7a91d5d9a7 (diff)
fix(ext/http): ensure that resources are closed when request is cancelled (#20641)
Builds on top of #20622 to fix #10854
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r--ext/http/response_body.rs34
1 files changed, 33 insertions, 1 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 5c946a4d3..4f7e3b0a5 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -125,6 +125,16 @@ pub enum ResponseStream {
TestChannel(tokio::sync::mpsc::Receiver<BufView>),
}
+impl ResponseStream {
+ pub fn abort(self) {
+ match self {
+ ResponseStream::Resource(resource) => resource.stm.close(),
+ #[cfg(test)]
+ ResponseStream::TestChannel(..) => {}
+ }
+ }
+}
+
#[derive(Default)]
pub enum ResponseBytesInner {
/// An empty stream.
@@ -192,11 +202,25 @@ impl ResponseBytes {
let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
self.completion_handle.complete(success);
- current
+ if success {
+ current
+ } else {
+ current.abort();
+ ResponseBytesInner::Done
+ }
}
}
impl ResponseBytesInner {
+ pub fn abort(self) {
+ match self {
+ Self::Done | Self::Empty | Self::Bytes(..) => {}
+ Self::BrotliStream(stm) => stm.abort(),
+ Self::GZipStream(stm) => stm.abort(),
+ Self::UncompressedStream(stm) => stm.abort(),
+ }
+ }
+
pub fn size_hint(&self) -> SizeHint {
match self {
Self::Done => SizeHint::with_exact(0),
@@ -463,6 +487,10 @@ impl GZipResponseStream {
underlying,
}
}
+
+ pub fn abort(self) {
+ self.underlying.abort()
+ }
}
/// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide
@@ -645,6 +673,10 @@ impl BrotliResponseStream {
underlying,
}
}
+
+ pub fn abort(self) {
+ self.underlying.abort()
+ }
}
fn max_compressed_size(input_size: usize) -> usize {