summaryrefslogtreecommitdiff
path: root/ext/http/service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r--ext/http/service.rs401
1 files changed, 401 insertions, 0 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs
new file mode 100644
index 000000000..ea67980f3
--- /dev/null
+++ b/ext/http/service.rs
@@ -0,0 +1,401 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use crate::request_properties::HttpConnectionProperties;
+use crate::response_body::CompletionHandle;
+use crate::response_body::ResponseBytes;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+use deno_core::ResourceId;
+use http::request::Parts;
+use http::HeaderMap;
+use hyper1::body::Incoming;
+use hyper1::upgrade::OnUpgrade;
+
+use scopeguard::guard;
+use scopeguard::ScopeGuard;
+use std::cell::Ref;
+use std::cell::RefCell;
+use std::cell::RefMut;
+use std::future::Future;
+use std::rc::Rc;
+
+pub type Request = hyper1::Request<Incoming>;
+pub type Response = hyper1::Response<ResponseBytes>;
+
+macro_rules! http_trace {
+ ($record:expr, $args:tt) => {
+ #[cfg(feature = "__http_tracing")]
+ {
+ println!(
+ "HTTP id={:p} strong={}: {}",
+ $record,
+ std::rc::Rc::strong_count(&$record),
+ format!($args),
+ );
+ }
+ };
+}
+
+pub(crate) use http_trace;
+
+#[repr(transparent)]
+#[derive(Clone, Default)]
+pub struct RefCount(pub Rc<()>);
+
+enum RequestBodyState {
+ Incoming(Incoming),
+ Resource(HttpRequestBodyAutocloser),
+}
+
+impl From<Incoming> for RequestBodyState {
+ fn from(value: Incoming) -> Self {
+ RequestBodyState::Incoming(value)
+ }
+}
+
+/// Ensures that the request body closes itself when no longer needed.
+pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>);
+
+impl HttpRequestBodyAutocloser {
+ pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self {
+ Self(res, op_state)
+ }
+}
+
+impl Drop for HttpRequestBodyAutocloser {
+ fn drop(&mut self) {
+ if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) {
+ res.close();
+ }
+ }
+}
+
+pub async fn handle_request(
+ request: Request,
+ request_info: HttpConnectionProperties,
+ _refcount: RefCount, // Keep server alive for duration of this future.
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
+) -> Result<Response, hyper::Error> {
+ // If the underlying TCP connection is closed, this future will be dropped
+ // and execution could stop at any await point.
+ // The HttpRecord must live until JavaScript is done processing so is wrapped
+ // in an Rc. The guard ensures unneeded resources are freed at cancellation.
+ let guarded_record =
+ guard(HttpRecord::new(request, request_info), HttpRecord::cancel);
+
+ // Clone HttpRecord and send to JavaScript for processing.
+ // Safe to unwrap as channel receiver is never closed.
+ tx.send(guarded_record.clone()).await.unwrap();
+
+ // Wait for JavaScript handler to return request.
+ http_trace!(*guarded_record, "handle_request response_ready.await");
+ guarded_record.response_ready().await;
+
+ // Defuse the guard. Must not await after the point.
+ let record = ScopeGuard::into_inner(guarded_record);
+ http_trace!(record, "handle_request complete");
+ assert!(
+ Rc::strong_count(&record) == 1,
+ "HTTP state error: Expected to be last strong reference (handle_request)"
+ );
+ let response = record.take_response();
+ Ok(response)
+}
+
+struct HttpRecordInner {
+ request_info: HttpConnectionProperties,
+ request_parts: Parts,
+ request_body: Option<RequestBodyState>,
+ /// The response may get taken before we tear this down
+ response: Option<Response>,
+ response_ready: bool,
+ response_waker: Option<std::task::Waker>,
+ trailers: Rc<RefCell<Option<HeaderMap>>>,
+ been_dropped: bool,
+}
+
+pub struct HttpRecord(RefCell<HttpRecordInner>);
+
+#[cfg(feature = "__http_tracing")]
+pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
+ std::sync::atomic::AtomicUsize::new(0);
+
+#[cfg(feature = "__http_tracing")]
+impl Drop for HttpRecord {
+ fn drop(&mut self) {
+ let count = RECORD_COUNT
+ .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
+ .checked_sub(1)
+ .expect("Count went below zero");
+ println!("HTTP count={count}: HttpRecord::drop");
+ }
+}
+
+impl HttpRecord {
+ fn new(request: Request, request_info: HttpConnectionProperties) -> Rc<Self> {
+ #[cfg(feature = "__http_tracing")]
+ {
+ RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ }
+ let (request_parts, request_body) = request.into_parts();
+ let body = ResponseBytes::default();
+ let trailers = body.trailers();
+ let request_body = Some(request_body.into());
+ let inner = HttpRecordInner {
+ request_info,
+ request_parts,
+ request_body,
+ response: Some(Response::new(body)),
+ response_ready: false,
+ response_waker: None,
+ trailers,
+ been_dropped: false,
+ };
+ #[allow(clippy::let_and_return)]
+ let record = Rc::new(Self(RefCell::new(inner)));
+ http_trace!(record, "HttpRecord::new");
+ record
+ }
+
+ fn self_ref(&self) -> Ref<'_, HttpRecordInner> {
+ self.0.borrow()
+ }
+
+ fn self_mut(&self) -> RefMut<'_, HttpRecordInner> {
+ self.0.borrow_mut()
+ }
+
+ /// Perform the Hyper upgrade on this record.
+ pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> {
+ // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
+ self
+ .self_mut()
+ .request_parts
+ .extensions
+ .remove::<OnUpgrade>()
+ .ok_or_else(|| AnyError::msg("upgrade unavailable"))
+ }
+
+ /// Take the Hyper body from this record.
+ pub fn take_body(&self) -> Option<Incoming> {
+ let body_holder = &mut self.self_mut().request_body;
+ let body = body_holder.take();
+ match body {
+ Some(RequestBodyState::Incoming(body)) => Some(body),
+ x => {
+ *body_holder = x;
+ None
+ }
+ }
+ }
+
+ pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> {
+ let body_holder = &mut self.self_mut().request_body;
+ let body = body_holder.take();
+ match body {
+ Some(RequestBodyState::Resource(res)) => Some(res),
+ x => {
+ *body_holder = x;
+ None
+ }
+ }
+ }
+
+ /// Replace the request body with a resource ID and the OpState we'll need to shut it down.
+ /// We cannot keep just the resource itself, as JS code might be reading from the resource ID
+ /// to generate the response data (requiring us to keep it in the resource table).
+ pub fn put_resource(&self, res: HttpRequestBodyAutocloser) {
+ self.self_mut().request_body = Some(RequestBodyState::Resource(res));
+ }
+
+ /// Cleanup resources not needed after the future is dropped.
+ fn cancel(self: Rc<Self>) {
+ http_trace!(self, "HttpRecord::cancel");
+ let mut inner = self.0.borrow_mut();
+ inner.been_dropped = true;
+ // The request body might include actual resources.
+ inner.request_body.take();
+ }
+
+ /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).
+ pub fn complete(self: Rc<Self>) {
+ http_trace!(self, "HttpRecord::complete");
+ let mut inner = self.self_mut();
+ assert!(
+ !inner.been_dropped || Rc::strong_count(&self) == 1,
+ "HTTP state error: Expected to be last strong reference (been_dropped)"
+ );
+ assert!(
+ !inner.response_ready,
+ "HTTP state error: Entry has already been completed"
+ );
+ inner.response_ready = true;
+ if let Some(waker) = inner.response_waker.take() {
+ drop(inner);
+ waker.wake();
+ }
+ }
+
+ /// Has the future for this record been dropped? ie, has the underlying TCP connection
+ /// been closed?
+ pub fn cancelled(&self) -> bool {
+ self.self_ref().been_dropped
+ }
+
+ /// Get a mutable reference to the response.
+ pub fn response(&self) -> RefMut<'_, Response> {
+ RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap())
+ }
+
+ /// Get a mutable reference to the trailers.
+ pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> {
+ Ref::map(self.self_ref(), |inner| &inner.trailers)
+ }
+
+ /// Take the response.
+ fn take_response(&self) -> Response {
+ self.self_mut().response.take().unwrap()
+ }
+
+ /// Get a reference to the connection properties.
+ pub fn request_info(&self) -> Ref<'_, HttpConnectionProperties> {
+ Ref::map(self.self_ref(), |inner| &inner.request_info)
+ }
+
+ /// Get a reference to the request parts.
+ pub fn request_parts(&self) -> Ref<'_, Parts> {
+ Ref::map(self.self_ref(), |inner| &inner.request_parts)
+ }
+
+ /// Get a reference to the completion handle.
+ fn response_ready(&self) -> impl Future<Output = ()> + '_ {
+ struct HttpRecordComplete<'a>(&'a HttpRecord);
+
+ impl<'a> Future for HttpRecordComplete<'a> {
+ type Output = ();
+
+ fn poll(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let mut mut_self = self.0 .0.borrow_mut();
+ if mut_self.response_ready {
+ return std::task::Poll::Ready(());
+ }
+ mut_self.response_waker = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+ }
+
+ HttpRecordComplete(self)
+ }
+
+ /// Get a reference to the response body completion handle.
+ pub fn body_promise(&self) -> CompletionHandle {
+ self
+ .self_ref()
+ .response
+ .as_ref()
+ .unwrap()
+ .body()
+ .completion_handle()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::hyper_util_tokioio::TokioIo;
+ use crate::response_body::Compression;
+ use crate::response_body::ResponseBytesInner;
+ use bytes::Buf;
+ use deno_net::raw::NetworkStreamType;
+ use hyper1::body::Body;
+ use hyper1::service::service_fn;
+ use hyper1::service::HttpService;
+ use std::error::Error as StdError;
+
+ /// Execute client request on service and concurrently map the response.
+ async fn serve_request<B, S, T, F>(
+ req: http::Request<B>,
+ service: S,
+ map_response: impl FnOnce(hyper1::Response<Incoming>) -> F,
+ ) -> hyper1::Result<T>
+ where
+ B: Body + Send + 'static, // Send bound due to DuplexStream
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ S: HttpService<Incoming>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ S::ResBody: 'static,
+ <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
+ F: std::future::Future<Output = hyper1::Result<T>>,
+ {
+ use hyper1::client::conn::http1::handshake;
+ use hyper1::server::conn::http1::Builder;
+ let (stream_client, stream_server) = tokio::io::duplex(16 * 1024);
+ let conn_server =
+ Builder::new().serve_connection(TokioIo::new(stream_server), service);
+ let (mut sender, conn_client) =
+ handshake(TokioIo::new(stream_client)).await?;
+
+ let (res, _, _) = tokio::try_join!(
+ async move {
+ let res = sender.send_request(req).await?;
+ map_response(res).await
+ },
+ conn_server,
+ conn_client,
+ )?;
+ Ok(res)
+ }
+
+ #[tokio::test]
+ async fn test_handle_request() -> Result<(), AnyError> {
+ let (tx, mut rx) = tokio::sync::mpsc::channel(10);
+ let refcount = RefCount::default();
+ let refcount_check = refcount.clone();
+ let request_info = HttpConnectionProperties {
+ peer_address: "".into(),
+ peer_port: None,
+ local_port: None,
+ stream_type: NetworkStreamType::Tcp,
+ };
+ let svc = service_fn(move |req: hyper1::Request<Incoming>| {
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
+ });
+
+ let client_req = http::Request::builder().uri("/").body("".to_string())?;
+
+ // Response produced by concurrent tasks
+ tokio::try_join!(
+ async move {
+ // JavaScript handler produces response
+ let record = rx.recv().await.unwrap();
+ let resource = record.take_resource();
+ record.response().body_mut().initialize(
+ ResponseBytesInner::from_vec(
+ Compression::None,
+ b"hello world".to_vec(),
+ ),
+ resource,
+ );
+ record.complete();
+ Ok(())
+ },
+ // Server connection executes service
+ async move {
+ serve_request(client_req, svc, |res| async {
+ // Client reads the response
+ use http_body_util::BodyExt;
+ assert_eq!(res.status(), 200);
+ let body = res.collect().await?.to_bytes();
+ assert_eq!(body.chunk(), b"hello world");
+ Ok(())
+ })
+ .await
+ },
+ )?;
+ assert_eq!(Rc::strong_count(&refcount_check.0), 1);
+ Ok(())
+ }
+}