summaryrefslogtreecommitdiff
path: root/ext/http/response_body.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r--ext/http/response_body.rs83
1 files changed, 13 insertions, 70 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 3697b2732..bd9d6f433 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -1,5 +1,4 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::io::Write;
@@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
use bytes::Bytes;
use bytes::BytesMut;
-use deno_core::error::bad_resource;
use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::futures::FutureExt;
-use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::RcRef;
use deno_core::Resource;
-use deno_core::WriteOutcome;
use flate2::write::GzEncoder;
use http::HeaderMap;
use hyper1::body::Body;
@@ -126,8 +119,8 @@ pub enum Compression {
pub enum ResponseStream {
/// A resource stream, piped in fast mode.
Resource(ResourceBodyAdapter),
- /// A JS-backed stream, written in JS and transported via pipe.
- V8Stream(tokio::sync::mpsc::Receiver<BufView>),
+ #[cfg(test)]
+ TestChannel(tokio::sync::mpsc::Receiver<BufView>),
}
#[derive(Default)]
@@ -217,13 +210,6 @@ impl ResponseBytesInner {
}
}
- pub fn from_v8(
- compression: Compression,
- rx: tokio::sync::mpsc::Receiver<BufView>,
- ) -> Self {
- Self::from_stream(compression, ResponseStream::V8Stream(rx))
- }
-
pub fn from_resource(
compression: Compression,
stm: Rc<dyn Resource>,
@@ -235,12 +221,12 @@ impl ResponseBytesInner {
)
}
- pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self {
+ pub fn from_bufview(compression: Compression, buf: BufView) -> Self {
match compression {
Compression::GZip => {
let mut writer =
GzEncoder::new(Vec::new(), flate2::Compression::fast());
- writer.write_all(bytes).unwrap();
+ writer.write_all(&buf).unwrap();
Self::Bytes(BufView::from(writer.finish().unwrap()))
}
Compression::Brotli => {
@@ -251,11 +237,11 @@ impl ResponseBytesInner {
// (~4MB)
let mut writer =
brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22);
- writer.write_all(bytes).unwrap();
+ writer.write_all(&buf).unwrap();
writer.flush().unwrap();
Self::Bytes(BufView::from(writer.into_inner()))
}
- _ => Self::Bytes(BufView::from(bytes.to_vec())),
+ _ => Self::Bytes(buf),
}
}
@@ -368,14 +354,16 @@ impl PollFrame for ResponseStream {
) -> std::task::Poll<ResponseStreamResult> {
match &mut *self {
ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx),
- ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx),
+ #[cfg(test)]
+ ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx),
}
}
fn size_hint(&self) -> SizeHint {
match self {
ResponseStream::Resource(res) => res.size_hint(),
- ResponseStream::V8Stream(res) => res.size_hint(),
+ #[cfg(test)]
+ ResponseStream::TestChannel(_) => SizeHint::default(),
}
}
}
@@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter {
}
}
+#[cfg(test)]
impl PollFrame for tokio::sync::mpsc::Receiver<BufView> {
fn poll_frame(
mut self: Pin<&mut Self>,
@@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream {
}
}
-/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
-/// feed's hyper's HTTP response.
-pub struct V8StreamHttpResponseBody(
- AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>,
- CancelHandle,
-);
-
-impl V8StreamHttpResponseBody {
- pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self {
- Self(AsyncRefCell::new(Some(sender)), CancelHandle::default())
- }
-}
-
-impl Resource for V8StreamHttpResponseBody {
- fn name(&self) -> Cow<str> {
- "responseBody".into()
- }
-
- fn write(
- self: Rc<Self>,
- buf: BufView,
- ) -> AsyncResult<deno_core::WriteOutcome> {
- let cancel_handle = RcRef::map(&self, |this| &this.1);
- Box::pin(
- async move {
- let nwritten = buf.len();
-
- let res = RcRef::map(self, |this| &this.0).borrow().await;
- if let Some(tx) = res.as_ref() {
- tx.send(buf)
- .await
- .map_err(|_| bad_resource("failed to write"))?;
- Ok(WriteOutcome::Full { nwritten })
- } else {
- Err(bad_resource("failed to write"))
- }
- }
- .try_or_cancel(cancel_handle),
- )
- }
-
- fn close(self: Rc<Self>) {
- self.1.cancel();
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -892,7 +835,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
- let underlying = ResponseStream::V8Stream(rx);
+ let underlying = ResponseStream::TestChannel(rx);
let mut resp = GZipResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {
@@ -934,7 +877,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
- let underlying = ResponseStream::V8Stream(rx);
+ let underlying = ResponseStream::TestChannel(rx);
let mut resp = BrotliResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {