summaryrefslogtreecommitdiff
path: root/ext/http/response_body.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r--ext/http/response_body.rs171
1 files changed, 2 insertions, 169 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 7d91dce6b..7201855cc 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -1,10 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use std::cell::RefCell;
-use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::rc::Rc;
-use std::task::Waker;
use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
@@ -18,16 +15,13 @@ use deno_core::BufView;
use deno_core::Resource;
use flate2::write::GzEncoder;
use http::HeaderMap;
-use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::SizeHint;
use pin_project::pin_project;
-use crate::service::HttpRequestBodyAutocloser;
-
/// Simplification for nested types we use for our streams. We provide a way to convert from
/// this type into Hyper's body [`Frame`].
-enum ResponseStreamResult {
+pub enum ResponseStreamResult {
/// Stream is over.
EndOfStream,
/// Stream provided non-empty data.
@@ -57,53 +51,7 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
}
}
-#[derive(Clone, Debug, Default)]
-pub struct CompletionHandle {
- inner: Rc<RefCell<CompletionHandleInner>>,
-}
-
-#[derive(Debug, Default)]
-struct CompletionHandleInner {
- complete: bool,
- success: bool,
- waker: Option<Waker>,
-}
-
-impl CompletionHandle {
- pub fn complete(&self, success: bool) {
- let mut mut_self = self.inner.borrow_mut();
- mut_self.complete = true;
- mut_self.success = success;
- if let Some(waker) = mut_self.waker.take() {
- drop(mut_self);
- waker.wake();
- }
- }
-
- #[allow(dead_code)]
- pub fn is_completed(&self) -> bool {
- self.inner.borrow().complete
- }
-}
-
-impl Future for CompletionHandle {
- type Output = bool;
-
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Self::Output> {
- let mut mut_self = self.inner.borrow_mut();
- if mut_self.complete {
- return std::task::Poll::Ready(mut_self.success);
- }
-
- mut_self.waker = Some(cx.waker().clone());
- std::task::Poll::Pending
- }
-}
-
-trait PollFrame: Unpin {
+pub trait PollFrame: Unpin {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
@@ -166,52 +114,6 @@ impl std::fmt::Debug for ResponseBytesInner {
}
}
-/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
-/// required by hyper. As the API requires information about request completion (including a success/fail
-/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
-#[derive(Default)]
-pub struct ResponseBytes {
- inner: ResponseBytesInner,
- completion_handle: CompletionHandle,
- headers: Rc<RefCell<Option<HeaderMap>>>,
- res: Option<HttpRequestBodyAutocloser>,
-}
-
-impl ResponseBytes {
- pub fn initialize(
- &mut self,
- inner: ResponseBytesInner,
- req_body_resource: Option<HttpRequestBodyAutocloser>,
- ) {
- debug_assert!(matches!(self.inner, ResponseBytesInner::Empty));
- self.inner = inner;
- self.res = req_body_resource;
- }
-
- pub fn completion_handle(&self) -> CompletionHandle {
- self.completion_handle.clone()
- }
-
- pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
- self.headers.clone()
- }
-
- fn complete(&mut self, success: bool) -> ResponseBytesInner {
- if matches!(self.inner, ResponseBytesInner::Done) {
- return ResponseBytesInner::Done;
- }
-
- let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
- self.completion_handle.complete(success);
- if success {
- current
- } else {
- current.abort();
- ResponseBytesInner::Done
- }
- }
-}
-
impl ResponseBytesInner {
pub fn abort(self) {
match self {
@@ -298,75 +200,6 @@ impl ResponseBytesInner {
}
}
-impl Body for ResponseBytes {
- type Data = BufView;
- type Error = AnyError;
-
- fn poll_frame(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
- let res = loop {
- let res = match &mut self.inner {
- ResponseBytesInner::Done | ResponseBytesInner::Empty => {
- if let Some(trailers) = self.headers.borrow_mut().take() {
- return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
- }
- unreachable!()
- }
- ResponseBytesInner::Bytes(..) => {
- let ResponseBytesInner::Bytes(data) = self.complete(true) else {
- unreachable!();
- };
- return std::task::Poll::Ready(Some(Ok(Frame::data(data))));
- }
- ResponseBytesInner::UncompressedStream(stm) => {
- ready!(Pin::new(stm).poll_frame(cx))
- }
- ResponseBytesInner::GZipStream(stm) => {
- ready!(Pin::new(stm).poll_frame(cx))
- }
- ResponseBytesInner::BrotliStream(stm) => {
- ready!(Pin::new(stm).poll_frame(cx))
- }
- };
- // This is where we retry the NoData response
- if matches!(res, ResponseStreamResult::NoData) {
- continue;
- }
- break res;
- };
-
- if matches!(res, ResponseStreamResult::EndOfStream) {
- if let Some(trailers) = self.headers.borrow_mut().take() {
- return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
- }
- self.complete(true);
- }
- std::task::Poll::Ready(res.into())
- }
-
- fn is_end_stream(&self) -> bool {
- matches!(
- self.inner,
- ResponseBytesInner::Done | ResponseBytesInner::Empty
- ) && self.headers.borrow_mut().is_none()
- }
-
- fn size_hint(&self) -> SizeHint {
- // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
- // anyways just in case hyper needs it.
- self.inner.size_hint()
- }
-}
-
-impl Drop for ResponseBytes {
- fn drop(&mut self) {
- // We won't actually poll_frame for Empty responses so this is where we return success
- self.complete(matches!(self.inner, ResponseBytesInner::Empty));
- }
-}
-
pub struct ResourceBodyAdapter {
auto_close: bool,
stm: Rc<dyn Resource>,