summaryrefslogtreecommitdiff
path: root/ext/http/service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r--ext/http/service.rs338
1 files changed, 253 insertions, 85 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs
index 87b308755..fbd533cac 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -1,13 +1,18 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
-use crate::response_body::CompletionHandle;
-use crate::response_body::ResponseBytes;
+use crate::response_body::ResponseBytesInner;
+use crate::response_body::ResponseStreamResult;
use deno_core::error::AnyError;
+use deno_core::futures::ready;
+use deno_core::BufView;
use deno_core::OpState;
use deno_core::ResourceId;
use http::request::Parts;
use http::HeaderMap;
+use hyper1::body::Body;
+use hyper1::body::Frame;
use hyper1::body::Incoming;
+use hyper1::body::SizeHint;
use hyper1::upgrade::OnUpgrade;
use scopeguard::guard;
@@ -16,10 +21,15 @@ use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
use std::future::Future;
+use std::mem::ManuallyDrop;
+use std::pin::Pin;
use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use std::task::Waker;
pub type Request = hyper1::Request<Incoming>;
-pub type Response = hyper1::Response<ResponseBytes>;
+pub type Response = hyper1::Response<HttpRecordResponse>;
macro_rules! http_trace {
($record:expr $(, $args:expr)*) => {
@@ -39,6 +49,7 @@ pub(crate) use http_trace;
struct HttpServerStateInner {
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
+ drain_waker: Option<Waker>,
}
pub struct HttpServerState(RefCell<HttpServerStateInner>);
@@ -47,8 +58,32 @@ impl HttpServerState {
pub fn new() -> Rc<Self> {
Rc::new(Self(RefCell::new(HttpServerStateInner {
pool: Vec::new(),
+ drain_waker: None,
})))
}
+
+ pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a {
+ struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>);
+
+ impl<'a> Future for HttpServerStateDrain<'a> {
+ type Output = ();
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ let server_state = self.0;
+ http_trace!(server_state, "HttpServerState::drain poll");
+ if Rc::strong_count(server_state) <= 1 {
+ return Poll::Ready(());
+ }
+ server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+
+ HttpServerStateDrain(self)
+ }
}
enum RequestBodyState {
@@ -102,25 +137,28 @@ pub async fn handle_request(
http_trace!(*guarded_record, "handle_request response_ready.await");
guarded_record.response_ready().await;
- // Defuse the guard. Must not await after the point.
+ // Defuse the guard. Must not await after this point.
let record = ScopeGuard::into_inner(guarded_record);
http_trace!(record, "handle_request complete");
- let response = record.take_response();
- record.recycle();
+ let response = record.into_response();
Ok(response)
}
struct HttpRecordInner {
server_state: Rc<HttpServerState>,
request_info: HttpConnectionProperties,
- request_parts: Parts,
+ request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
- /// The response may get taken before we tear this down
- response: Option<Response>,
+ response_parts: Option<http::response::Parts>,
response_ready: bool,
- response_waker: Option<std::task::Waker>,
- trailers: Rc<RefCell<Option<HeaderMap>>>,
+ response_waker: Option<Waker>,
+ response_body: ResponseBytesInner,
+ response_body_finished: bool,
+ response_body_waker: Option<Waker>,
+ trailers: Option<HeaderMap>,
been_dropped: bool,
+ finished: bool,
+ needs_close_after_finish: bool,
}
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
@@ -147,45 +185,72 @@ impl HttpRecord {
server_state: Rc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
- let body = ResponseBytes::default();
- let trailers = body.trailers();
let request_body = Some(request_body.into());
- let mut response = Response::new(body);
- let reuse_record =
+ let (mut response_parts, _) = http::Response::new(()).into_parts();
+ let record =
if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
- *response.headers_mut() = headers;
- Some(record)
+ response_parts.headers = headers;
+ http_trace!(record, "HttpRecord::reuse");
+ record
} else {
- None
+ #[cfg(feature = "__http_tracing")]
+ {
+ RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ }
+
+ #[allow(clippy::let_and_return)]
+ let record = Rc::new(Self(RefCell::new(None)));
+ http_trace!(record, "HttpRecord::new");
+ record
};
- let inner = HttpRecordInner {
+ *record.0.borrow_mut() = Some(HttpRecordInner {
server_state,
request_info,
request_parts,
request_body,
- response: Some(response),
+ response_parts: Some(response_parts),
response_ready: false,
response_waker: None,
- trailers,
+ response_body: ResponseBytesInner::Empty,
+ response_body_finished: false,
+ response_body_waker: None,
+ trailers: None,
been_dropped: false,
- };
- if let Some(record) = reuse_record {
- *record.0.borrow_mut() = Some(inner);
- http_trace!(record, "HttpRecord::reuse");
- record
- } else {
- #[cfg(feature = "__http_tracing")]
- {
- RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
- }
+ finished: false,
+ needs_close_after_finish: false,
+ });
+ record
+ }
+
+ fn finish(self: Rc<Self>) {
+ http_trace!(self, "HttpRecord::finish");
+ let mut inner = self.self_mut();
+ inner.response_body_finished = true;
+ let response_body_waker = inner.response_body_waker.take();
+ let needs_close_after_finish = inner.needs_close_after_finish;
+ drop(inner);
+ if let Some(waker) = response_body_waker {
+ waker.wake();
+ }
+ if !needs_close_after_finish {
+ self.recycle();
+ }
+ }
- #[allow(clippy::let_and_return)]
- let record = Rc::new(Self(RefCell::new(Some(inner))));
- http_trace!(record, "HttpRecord::new");
- record
+ pub fn close_after_finish(self: Rc<Self>) {
+ debug_assert!(self.self_ref().needs_close_after_finish);
+ let mut inner = self.self_mut();
+ inner.needs_close_after_finish = false;
+ if !inner.finished {
+ drop(inner);
+ self.recycle();
}
}
+ pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> {
+ RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
+ }
+
fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
@@ -196,12 +261,19 @@ impl HttpRecord {
request_parts: Parts { mut headers, .. },
..
} = self.0.borrow_mut().take().unwrap();
+
let mut server_state_mut = server_state.0.borrow_mut();
let inflight = Rc::strong_count(&server_state);
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
- // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling
- // the to-drop objects off to another thread.
+ // Server is shutting down so wake the drain future.
+ if let Some(waker) = server_state_mut.drain_waker.take() {
+ drop(server_state_mut);
+ drop(server_state);
+ http_trace!(self, "HttpRecord::recycle wake");
+ waker.wake();
+ return;
+ }
// Keep a buffer of allocations on hand to be reused by incoming requests.
// Estimated target size is 16 + 1/8 the number of inflight requests.
@@ -235,7 +307,7 @@ impl HttpRecord {
}
/// Take the Hyper body from this record.
- pub fn take_body(&self) -> Option<Incoming> {
+ pub fn take_request_body(&self) -> Option<Incoming> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
@@ -247,18 +319,6 @@ impl HttpRecord {
}
}
- pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> {
- let body_holder = &mut self.self_mut().request_body;
- let body = body_holder.take();
- match body {
- Some(RequestBodyState::Resource(res)) => Some(res),
- x => {
- *body_holder = x;
- None
- }
- }
- }
-
/// Replace the request body with a resource ID and the OpState we'll need to shut it down.
/// We cannot keep just the resource itself, as JS code might be reading from the resource ID
/// to generate the response data (requiring us to keep it in the resource table).
@@ -273,7 +333,7 @@ impl HttpRecord {
if inner.response_ready {
// Future dropped between wake() and async fn resuming.
drop(inner);
- self.recycle();
+ self.finish();
return;
}
inner.been_dropped = true;
@@ -291,7 +351,7 @@ impl HttpRecord {
);
if inner.been_dropped {
drop(inner);
- self.recycle();
+ self.finish();
return;
}
inner.response_ready = true;
@@ -301,25 +361,44 @@ impl HttpRecord {
}
}
+ fn take_response_body(&self) -> ResponseBytesInner {
+ let mut inner = self.self_mut();
+ debug_assert!(
+ !matches!(inner.response_body, ResponseBytesInner::Done),
+ "HTTP state error: response body already complete"
+ );
+ std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done)
+ }
+
/// Has the future for this record been dropped? ie, has the underlying TCP connection
/// been closed?
pub fn cancelled(&self) -> bool {
self.self_ref().been_dropped
}
- /// Get a mutable reference to the response.
- pub fn response(&self) -> RefMut<'_, Response> {
- RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap())
+ /// Get a mutable reference to the response status and headers.
+ pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> {
+ RefMut::map(self.self_mut(), |inner| {
+ inner.response_parts.as_mut().unwrap()
+ })
}
/// Get a mutable reference to the trailers.
- pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> {
- Ref::map(self.self_ref(), |inner| &inner.trailers)
+ pub fn trailers(&self) -> RefMut<'_, Option<HeaderMap>> {
+ RefMut::map(self.self_mut(), |inner| &mut inner.trailers)
+ }
+
+ pub fn set_response_body(&self, response_body: ResponseBytesInner) {
+ let mut inner = self.self_mut();
+ debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty));
+ inner.response_body = response_body;
}
/// Take the response.
- fn take_response(&self) -> Response {
- self.self_mut().response.take().unwrap()
+ fn into_response(self: Rc<Self>) -> Response {
+ let parts = self.self_mut().response_parts.take().unwrap();
+ let body = HttpRecordResponse(ManuallyDrop::new(self));
+ http::Response::from_parts(parts, body)
}
/// Get a reference to the connection properties.
@@ -332,38 +411,131 @@ impl HttpRecord {
Ref::map(self.self_ref(), |inner| &inner.request_parts)
}
- /// Get a reference to the completion handle.
+ /// Resolves when response head is ready.
fn response_ready(&self) -> impl Future<Output = ()> + '_ {
- struct HttpRecordComplete<'a>(&'a HttpRecord);
+ struct HttpRecordReady<'a>(&'a HttpRecord);
- impl<'a> Future for HttpRecordComplete<'a> {
+ impl<'a> Future for HttpRecordReady<'a> {
type Output = ();
fn poll(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Self::Output> {
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_ready {
- return std::task::Poll::Ready(());
+ return Poll::Ready(());
}
mut_self.response_waker = Some(cx.waker().clone());
- std::task::Poll::Pending
+ Poll::Pending
}
}
- HttpRecordComplete(self)
+ HttpRecordReady(self)
}
- /// Get a reference to the response body completion handle.
- pub fn body_promise(&self) -> CompletionHandle {
- self
- .self_ref()
- .response
- .as_ref()
- .unwrap()
- .body()
- .completion_handle()
+ /// Resolves when response body has finished streaming.
+ pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ {
+ struct HttpRecordFinished<'a>(&'a HttpRecord);
+
+ impl<'a> Future for HttpRecordFinished<'a> {
+ type Output = ();
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ let mut mut_self = self.0.self_mut();
+ if mut_self.response_body_finished {
+ return Poll::Ready(());
+ }
+ mut_self.response_body_waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+
+ HttpRecordFinished(self)
+ }
+}
+
+#[repr(transparent)]
+pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>);
+
+impl Body for HttpRecordResponse {
+ type Data = BufView;
+ type Error = AnyError;
+
+ fn poll_frame(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ use crate::response_body::PollFrame;
+ let record = &self.0;
+
+ let res = loop {
+ let mut inner = record.self_mut();
+ let res = match &mut inner.response_body {
+ ResponseBytesInner::Done | ResponseBytesInner::Empty => {
+ if let Some(trailers) = inner.trailers.take() {
+ return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
+ }
+ unreachable!()
+ }
+ ResponseBytesInner::Bytes(..) => {
+ drop(inner);
+ let ResponseBytesInner::Bytes(data) = record.take_response_body()
+ else {
+ unreachable!();
+ };
+ return Poll::Ready(Some(Ok(Frame::data(data))));
+ }
+ ResponseBytesInner::UncompressedStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
+ }
+ ResponseBytesInner::GZipStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
+ }
+ ResponseBytesInner::BrotliStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
+ }
+ };
+ // This is where we retry the NoData response
+ if matches!(res, ResponseStreamResult::NoData) {
+ continue;
+ }
+ break res;
+ };
+
+ if matches!(res, ResponseStreamResult::EndOfStream) {
+ if let Some(trailers) = record.self_mut().trailers.take() {
+ return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
+ }
+ record.take_response_body();
+ }
+ Poll::Ready(res.into())
+ }
+
+ fn is_end_stream(&self) -> bool {
+ let inner = self.0.self_ref();
+ matches!(
+ inner.response_body,
+ ResponseBytesInner::Done | ResponseBytesInner::Empty
+ ) && inner.trailers.is_none()
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
+ // anyways just in case hyper needs it.
+ self.0.self_ref().response_body.size_hint()
+ }
+}
+
+impl Drop for HttpRecordResponse {
+ fn drop(&mut self) {
+ // SAFETY: this ManuallyDrop is not used again.
+ let record = unsafe { ManuallyDrop::take(&mut self.0) };
+ http_trace!(record, "HttpRecordResponse::drop");
+ record.finish();
}
}
@@ -442,14 +614,10 @@ mod tests {
async move {
// JavaScript handler produces response
let record = rx.recv().await.unwrap();
- let resource = record.take_resource();
- record.response().body_mut().initialize(
- ResponseBytesInner::from_vec(
- Compression::None,
- b"hello world".to_vec(),
- ),
- resource,
- );
+ record.set_response_body(ResponseBytesInner::from_vec(
+ Compression::None,
+ b"hello world".to_vec(),
+ ));
record.complete();
Ok(())
},