diff options
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r-- | ext/http/response_body.rs | 171 |
1 files changed, 2 insertions, 169 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 7d91dce6b..7201855cc 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -1,10 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::cell::RefCell; -use std::future::Future; use std::io::Write; use std::pin::Pin; use std::rc::Rc; -use std::task::Waker; use brotli::enc::encode::BrotliEncoderParameter; use brotli::ffi::compressor::BrotliEncoderState; @@ -18,16 +15,13 @@ use deno_core::BufView; use deno_core::Resource; use flate2::write::GzEncoder; use http::HeaderMap; -use hyper1::body::Body; use hyper1::body::Frame; use hyper1::body::SizeHint; use pin_project::pin_project; -use crate::service::HttpRequestBodyAutocloser; - /// Simplification for nested types we use for our streams. We provide a way to convert from /// this type into Hyper's body [`Frame`]. -enum ResponseStreamResult { +pub enum ResponseStreamResult { /// Stream is over. EndOfStream, /// Stream provided non-empty data. @@ -57,53 +51,7 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> { } } -#[derive(Clone, Debug, Default)] -pub struct CompletionHandle { - inner: Rc<RefCell<CompletionHandleInner>>, -} - -#[derive(Debug, Default)] -struct CompletionHandleInner { - complete: bool, - success: bool, - waker: Option<Waker>, -} - -impl CompletionHandle { - pub fn complete(&self, success: bool) { - let mut mut_self = self.inner.borrow_mut(); - mut_self.complete = true; - mut_self.success = success; - if let Some(waker) = mut_self.waker.take() { - drop(mut_self); - waker.wake(); - } - } - - #[allow(dead_code)] - pub fn is_completed(&self) -> bool { - self.inner.borrow().complete - } -} - -impl Future for CompletionHandle { - type Output = bool; - - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Self::Output> { - let mut mut_self = self.inner.borrow_mut(); - if mut_self.complete { - return std::task::Poll::Ready(mut_self.success); - } - - mut_self.waker = Some(cx.waker().clone()); - std::task::Poll::Pending - } -} - -trait PollFrame: Unpin { +pub trait PollFrame: Unpin { fn poll_frame( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -166,52 +114,6 @@ impl std::fmt::Debug for ResponseBytesInner { } } -/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface -/// required by hyper. As the API requires information about request completion (including a success/fail -/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on. -#[derive(Default)] -pub struct ResponseBytes { - inner: ResponseBytesInner, - completion_handle: CompletionHandle, - headers: Rc<RefCell<Option<HeaderMap>>>, - res: Option<HttpRequestBodyAutocloser>, -} - -impl ResponseBytes { - pub fn initialize( - &mut self, - inner: ResponseBytesInner, - req_body_resource: Option<HttpRequestBodyAutocloser>, - ) { - debug_assert!(matches!(self.inner, ResponseBytesInner::Empty)); - self.inner = inner; - self.res = req_body_resource; - } - - pub fn completion_handle(&self) -> CompletionHandle { - self.completion_handle.clone() - } - - pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> { - self.headers.clone() - } - - fn complete(&mut self, success: bool) -> ResponseBytesInner { - if matches!(self.inner, ResponseBytesInner::Done) { - return ResponseBytesInner::Done; - } - - let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done); - self.completion_handle.complete(success); - if success { - current - } else { - current.abort(); - ResponseBytesInner::Done - } - } -} - impl ResponseBytesInner { pub fn abort(self) { match self { @@ -298,75 +200,6 @@ impl ResponseBytesInner { } } -impl Body for ResponseBytes { - type Data = BufView; - type Error = AnyError; - - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { - let res = loop { - let res = match &mut self.inner { - ResponseBytesInner::Done | ResponseBytesInner::Empty => { - if let Some(trailers) = self.headers.borrow_mut().take() { - return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); - } - unreachable!() - } - ResponseBytesInner::Bytes(..) => { - let ResponseBytesInner::Bytes(data) = self.complete(true) else { - unreachable!(); - }; - return std::task::Poll::Ready(Some(Ok(Frame::data(data)))); - } - ResponseBytesInner::UncompressedStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - ResponseBytesInner::GZipStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - ResponseBytesInner::BrotliStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - }; - // This is where we retry the NoData response - if matches!(res, ResponseStreamResult::NoData) { - continue; - } - break res; - }; - - if matches!(res, ResponseStreamResult::EndOfStream) { - if let Some(trailers) = self.headers.borrow_mut().take() { - return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); - } - self.complete(true); - } - std::task::Poll::Ready(res.into()) - } - - fn is_end_stream(&self) -> bool { - matches!( - self.inner, - ResponseBytesInner::Done | ResponseBytesInner::Empty - ) && self.headers.borrow_mut().is_none() - } - - fn size_hint(&self) -> SizeHint { - // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through - // anyways just in case hyper needs it. - self.inner.size_hint() - } -} - -impl Drop for ResponseBytes { - fn drop(&mut self) { - // We won't actually poll_frame for Empty responses so this is where we return success - self.complete(matches!(self.inner, ResponseBytesInner::Empty)); - } -} - pub struct ResourceBodyAdapter { auto_close: bool, stm: Rc<dyn Resource>, |