summaryrefslogtreecommitdiff
path: root/ext/http/response_body.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-04-22 11:48:21 -0600
committerGitHub <noreply@github.com>2023-04-22 11:48:21 -0600
commitbdffcb409fd1e257db280ab73e07cc319711256c (patch)
tree9aca1c1e73f0249bba8b66781b79c358a7a00798 /ext/http/response_body.rs
parentd137501a639cb315772866f6775fcd9f43e28f5b (diff)
feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619)
This is a rewrite of the `Deno.serve` API to live on top of hyper 1.0-rc3. The code should be more maintainable long-term, and avoids some of the slower mpsc patterns that made the older code less efficient than it could have been. Missing features: - `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available, however). - Automatic compression is unavailable on responses.
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r--ext/http/response_body.rs253
1 files changed, 253 insertions, 0 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
new file mode 100644
index 000000000..0086e4d78
--- /dev/null
+++ b/ext/http/response_body.rs
@@ -0,0 +1,253 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Waker;
+
+use deno_core::error::bad_resource;
+use deno_core::error::AnyError;
+use deno_core::futures::FutureExt;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::WriteOutcome;
+use hyper1::body::Body;
+use hyper1::body::Frame;
+use hyper1::body::SizeHint;
+
+#[derive(Clone, Debug, Default)]
+pub struct CompletionHandle {
+ inner: Rc<RefCell<CompletionHandleInner>>,
+}
+
+#[derive(Debug, Default)]
+struct CompletionHandleInner {
+ complete: bool,
+ success: bool,
+ waker: Option<Waker>,
+}
+
+impl CompletionHandle {
+ pub fn complete(&self, success: bool) {
+ let mut mut_self = self.inner.borrow_mut();
+ mut_self.complete = true;
+ mut_self.success = success;
+ if let Some(waker) = mut_self.waker.take() {
+ drop(mut_self);
+ waker.wake();
+ }
+ }
+}
+
+impl Future for CompletionHandle {
+ type Output = bool;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let mut mut_self = self.inner.borrow_mut();
+ if mut_self.complete {
+ return std::task::Poll::Ready(mut_self.success);
+ }
+
+ mut_self.waker = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+}
+
+#[derive(Default)]
+pub enum ResponseBytesInner {
+ /// An empty stream.
+ #[default]
+ Empty,
+ /// A completed stream.
+ Done,
+ /// A static buffer of bytes, sent it one fell swoop.
+ Bytes(BufView),
+ /// A resource stream, piped in fast mode.
+ Resource(bool, Rc<dyn Resource>, AsyncResult<BufView>),
+ /// A JS-backed stream, written in JS and transported via pipe.
+ V8Stream(tokio::sync::mpsc::Receiver<BufView>),
+}
+
+impl std::fmt::Debug for ResponseBytesInner {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Done => f.write_str("Done"),
+ Self::Empty => f.write_str("Empty"),
+ Self::Bytes(..) => f.write_str("Bytes"),
+ Self::Resource(..) => f.write_str("Resource"),
+ Self::V8Stream(..) => f.write_str("V8Stream"),
+ }
+ }
+}
+
+/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
+/// required by hyper. As the API requires information about request completion (including a success/fail
+/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
+#[derive(Debug, Default)]
+pub struct ResponseBytes(ResponseBytesInner, CompletionHandle);
+
+impl ResponseBytes {
+ pub fn initialize(&mut self, inner: ResponseBytesInner) {
+ debug_assert!(matches!(self.0, ResponseBytesInner::Empty));
+ self.0 = inner;
+ }
+
+ pub fn completion_handle(&self) -> CompletionHandle {
+ self.1.clone()
+ }
+
+ fn complete(&mut self, success: bool) -> ResponseBytesInner {
+ if matches!(self.0, ResponseBytesInner::Done) {
+ return ResponseBytesInner::Done;
+ }
+
+ let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done);
+ self.1.complete(success);
+ current
+ }
+}
+
+impl ResponseBytesInner {
+ pub fn size_hint(&self) -> SizeHint {
+ match self {
+ Self::Done => SizeHint::with_exact(0),
+ Self::Empty => SizeHint::with_exact(0),
+ Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
+ Self::Resource(_, res, _) => {
+ let hint = res.size_hint();
+ let mut size_hint = SizeHint::new();
+ size_hint.set_lower(hint.0);
+ if let Some(upper) = hint.1 {
+ size_hint.set_upper(upper)
+ }
+ size_hint
+ }
+ Self::V8Stream(..) => SizeHint::default(),
+ }
+ }
+}
+
+impl Body for ResponseBytes {
+ type Data = BufView;
+ type Error = AnyError;
+
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ match &mut self.0 {
+ ResponseBytesInner::Done | ResponseBytesInner::Empty => {
+ unreachable!()
+ }
+ ResponseBytesInner::Bytes(..) => {
+ if let ResponseBytesInner::Bytes(data) = self.complete(true) {
+ std::task::Poll::Ready(Some(Ok(Frame::data(data))))
+ } else {
+ unreachable!()
+ }
+ }
+ ResponseBytesInner::Resource(auto_close, stm, ref mut future) => {
+ match future.poll_unpin(cx) {
+ std::task::Poll::Pending => std::task::Poll::Pending,
+ std::task::Poll::Ready(Err(err)) => {
+ std::task::Poll::Ready(Some(Err(err)))
+ }
+ std::task::Poll::Ready(Ok(buf)) => {
+ if buf.is_empty() {
+ if *auto_close {
+ stm.clone().close();
+ }
+ self.complete(true);
+ return std::task::Poll::Ready(None);
+ }
+ // Re-arm the future
+ *future = stm.clone().read(64 * 1024);
+ std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
+ }
+ }
+ }
+ ResponseBytesInner::V8Stream(stm) => match stm.poll_recv(cx) {
+ std::task::Poll::Pending => std::task::Poll::Pending,
+ std::task::Poll::Ready(Some(buf)) => {
+ std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
+ }
+ std::task::Poll::Ready(None) => {
+ self.complete(true);
+ std::task::Poll::Ready(None)
+ }
+ },
+ }
+ }
+
+ fn is_end_stream(&self) -> bool {
+ matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty)
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
+ // anyways just in case hyper needs it.
+ self.0.size_hint()
+ }
+}
+
+impl Drop for ResponseBytes {
+ fn drop(&mut self) {
+ // We won't actually poll_frame for Empty responses so this is where we return success
+ self.complete(matches!(self.0, ResponseBytesInner::Empty));
+ }
+}
+
+/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
+/// feed's hyper's HTTP response.
+pub struct V8StreamHttpResponseBody(
+ AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>,
+ CancelHandle,
+);
+
+impl V8StreamHttpResponseBody {
+ pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self {
+ Self(AsyncRefCell::new(Some(sender)), CancelHandle::default())
+ }
+}
+
+impl Resource for V8StreamHttpResponseBody {
+ fn name(&self) -> Cow<str> {
+ "responseBody".into()
+ }
+
+ fn write(
+ self: Rc<Self>,
+ buf: BufView,
+ ) -> AsyncResult<deno_core::WriteOutcome> {
+ let cancel_handle = RcRef::map(&self, |this| &this.1);
+ Box::pin(
+ async move {
+ let nwritten = buf.len();
+
+ let res = RcRef::map(self, |this| &this.0).borrow().await;
+ if let Some(tx) = res.as_ref() {
+ tx.send(buf)
+ .await
+ .map_err(|_| bad_resource("failed to write"))?;
+ Ok(WriteOutcome::Full { nwritten })
+ } else {
+ Err(bad_resource("failed to write"))
+ }
+ }
+ .try_or_cancel(cancel_handle),
+ )
+ }
+
+ fn close(self: Rc<Self>) {
+ self.1.cancel();
+ }
+}