diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-09-25 09:23:55 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-25 17:23:55 +0200 |
commit | a27ee8f368dbac33141fdcb9a17d0e4ea907b8ef (patch) | |
tree | 4bc1bfa1e23b40bfb726cd5b179091393533dec2 /ext/http/response_body.rs | |
parent | 83f20007aac0f9ebb0eb59b71a932e7a91d5d9a7 (diff) |
fix(ext/http): ensure that resources are closed when request is cancelled (#20641)
Builds on top of #20622 to fix #10854
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r-- | ext/http/response_body.rs | 34 |
1 files changed, 33 insertions, 1 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 5c946a4d3..4f7e3b0a5 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -125,6 +125,16 @@ pub enum ResponseStream { TestChannel(tokio::sync::mpsc::Receiver<BufView>), } +impl ResponseStream { + pub fn abort(self) { + match self { + ResponseStream::Resource(resource) => resource.stm.close(), + #[cfg(test)] + ResponseStream::TestChannel(..) => {} + } + } +} + #[derive(Default)] pub enum ResponseBytesInner { /// An empty stream. @@ -192,11 +202,25 @@ impl ResponseBytes { let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done); self.completion_handle.complete(success); - current + if success { + current + } else { + current.abort(); + ResponseBytesInner::Done + } } } impl ResponseBytesInner { + pub fn abort(self) { + match self { + Self::Done | Self::Empty | Self::Bytes(..) => {} + Self::BrotliStream(stm) => stm.abort(), + Self::GZipStream(stm) => stm.abort(), + Self::UncompressedStream(stm) => stm.abort(), + } + } + pub fn size_hint(&self) -> SizeHint { match self { Self::Done => SizeHint::with_exact(0), @@ -463,6 +487,10 @@ impl GZipResponseStream { underlying, } } + + pub fn abort(self) { + self.underlying.abort() + } } /// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide @@ -645,6 +673,10 @@ impl BrotliResponseStream { underlying, } } + + pub fn abort(self) { + self.underlying.abort() + } } fn max_compressed_size(input_size: usize) -> usize { |