summaryrefslogtreecommitdiff
path: root/ext/http/request_body.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-04-22 11:48:21 -0600
committerGitHub <noreply@github.com>2023-04-22 11:48:21 -0600
commitbdffcb409fd1e257db280ab73e07cc319711256c (patch)
tree9aca1c1e73f0249bba8b66781b79c358a7a00798 /ext/http/request_body.rs
parentd137501a639cb315772866f6775fcd9f43e28f5b (diff)
feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619)
This is a rewrite of the `Deno.serve` API to live on top of hyper 1.0-rc3. The code should be more maintainable long-term, and avoids some of the slower mpsc patterns that made the older code less efficient than it could have been. Missing features: - `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available, however). - Automatic compression is unavailable on responses.
Diffstat (limited to 'ext/http/request_body.rs')
-rw-r--r--ext/http/request_body.rs84
1 files changed, 84 insertions, 0 deletions
diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs
new file mode 100644
index 000000000..73908ca55
--- /dev/null
+++ b/ext/http/request_body.rs
@@ -0,0 +1,84 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use bytes::Bytes;
+use deno_core::error::AnyError;
+use deno_core::futures::stream::Peekable;
+use deno_core::futures::Stream;
+use deno_core::futures::StreamExt;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
+use deno_core::RcRef;
+use deno_core::Resource;
+use hyper1::body::Body;
+use hyper1::body::Incoming;
+use hyper1::body::SizeHint;
+use std::borrow::Cow;
+use std::pin::Pin;
+use std::rc::Rc;
+
+/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8.
+struct ReadFuture(Incoming);
+
+impl Stream for ReadFuture {
+ type Item = Result<Bytes, AnyError>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let res = Pin::new(&mut self.get_mut().0).poll_frame(cx);
+ match res {
+ std::task::Poll::Ready(Some(Ok(frame))) => {
+ if let Ok(data) = frame.into_data() {
+ // Ensure that we never yield an empty frame
+ if !data.is_empty() {
+ return std::task::Poll::Ready(Some(Ok(data)));
+ }
+ }
+ }
+ std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
+ _ => {}
+ }
+ std::task::Poll::Pending
+ }
+}
+
+pub struct HttpRequestBody(AsyncRefCell<Peekable<ReadFuture>>, SizeHint);
+
+impl HttpRequestBody {
+ pub fn new(body: Incoming) -> Self {
+ let size_hint = body.size_hint();
+ Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint)
+ }
+
+ async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
+ let peekable = RcRef::map(self, |this| &this.0);
+ let mut peekable = peekable.borrow_mut().await;
+ match Pin::new(&mut *peekable).peek_mut().await {
+ None => Ok(BufView::empty()),
+ Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
+ Some(Ok(bytes)) => {
+ if bytes.len() <= limit {
+ // We can safely take the next item since we peeked it
+ return Ok(BufView::from(peekable.next().await.unwrap()?));
+ }
+ let ret = bytes.split_to(limit);
+ Ok(BufView::from(ret))
+ }
+ }
+ }
+}
+
+impl Resource for HttpRequestBody {
+ fn name(&self) -> Cow<str> {
+ "requestBody".into()
+ }
+
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ Box::pin(HttpRequestBody::read(self, limit))
+ }
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ (self.1.lower(), self.1.upper())
+ }
+}