summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs248
1 files changed, 189 insertions, 59 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 476a55a80..94f6f1241 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,11 +10,13 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
+use crate::slab::http_trace;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
use crate::slab::slab_insert;
use crate::slab::HttpRequestBodyAutocloser;
+use crate::slab::RefCount;
use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
@@ -70,6 +72,7 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
+use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -690,7 +693,10 @@ pub async fn op_http_track(
.resource_table
.get::<HttpJoinHandle>(server_rid)?;
- match handle.or_cancel(join_handle.cancel_handle()).await {
+ match handle
+ .or_cancel(join_handle.connection_cancel_handle())
+ .await
+ {
Ok(true) => Ok(()),
Ok(false) => {
Err(AnyError::msg("connection closed before message completed"))
@@ -705,14 +711,17 @@ pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
+ refcount: RefCount,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> SlabFuture<impl Future<Output = ()>> {
- let index = slab_insert(request, request_info);
+ let index = slab_insert(request, request_info, refcount);
let rx = slab_get(index).promise();
SlabFuture(index, async move {
if tx.send(index).await.is_ok() {
+ http_trace!(index, "SlabFuture await");
// We only need to wait for completion if we aren't closed
rx.await;
+ http_trace!(index, "SlabFuture complete");
}
})
}
@@ -745,45 +754,75 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
.keep_alive(true)
.writev(*USE_WRITEV)
- .serve_connection(TokioIo::new(io), svc);
-
- conn.with_upgrades().map_err(AnyError::from)
+ .serve_connection(TokioIo::new(io), svc)
+ .with_upgrades();
+
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
-) -> impl Future<Output = Result<(), AnyError>> + 'static {
+ cancel: Rc<CancelHandle>,
+) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
- conn.map_err(AnyError::from)
+ async {
+ match conn.or_abort(cancel).await {
+ Err(mut conn) => {
+ Pin::new(&mut conn).graceful_shutdown();
+ conn.await
+ }
+ Ok(res) => res,
+ }
+ }
}
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, cancel)
+ .await
+ .map_err(|e| e.into())
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
@@ -792,33 +831,46 @@ fn serve_https(
// based on the prefix bytes
let handshake = io.get_ref().1.alpn_protocol();
if handshake == Some(TLS_ALPN_HTTP_2) {
- serve_http2_unconditional(io, svc).await
+ serve_http2_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else if handshake == Some(TLS_ALPN_HTTP_11) {
- serve_http11_unconditional(io, svc).await
+ serve_http11_unconditional(io, svc, listen_cancel_handle)
+ .await
+ .map_err(|e| e.into())
} else {
- serve_http2_autodetect(io, svc).await
+ serve_http2_autodetect(io, svc, listen_cancel_handle).await
}
}
- .try_or_cancel(cancel),
+ .try_or_cancel(connection_cancel_handle),
)
}
fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
+ let HttpLifetime {
+ refcount,
+ connection_cancel_handle,
+ listen_cancel_handle,
+ } = lifetime;
+
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), tx.clone())
+ new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
- spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
+ spawn(
+ serve_http2_autodetect(io, svc, listen_cancel_handle)
+ .try_or_cancel(connection_cancel_handle),
+ )
}
fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
- cancel: Rc<CancelHandle>,
+ lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>>
where
@@ -831,28 +883,58 @@ where
match network_stream {
NetworkStream::Tcp(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
NetworkStream::Tls(conn) => {
- serve_https(conn, connection_properties, cancel, tx)
+ serve_https(conn, connection_properties, lifetime, tx)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
- serve_http(conn, connection_properties, cancel, tx)
+ serve_http(conn, connection_properties, lifetime, tx)
}
}
}
-struct HttpJoinHandle(
- AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
- // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
- Rc<CancelHandle>,
- AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
-);
+#[derive(Clone)]
+struct HttpLifetime {
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ refcount: RefCount,
+}
+
+struct HttpJoinHandle {
+ join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
+ connection_cancel_handle: Rc<CancelHandle>,
+ listen_cancel_handle: Rc<CancelHandle>,
+ rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
+ refcount: RefCount,
+}
impl HttpJoinHandle {
- fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
- self.1.clone()
+ fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
+ Self {
+ join_handle: AsyncRefCell::new(None),
+ connection_cancel_handle: CancelHandle::new_rc(),
+ listen_cancel_handle: CancelHandle::new_rc(),
+ rx: AsyncRefCell::new(rx),
+ refcount: RefCount::default(),
+ }
+ }
+
+ fn lifetime(self: &Rc<Self>) -> HttpLifetime {
+ HttpLifetime {
+ connection_cancel_handle: self.connection_cancel_handle.clone(),
+ listen_cancel_handle: self.listen_cancel_handle.clone(),
+ refcount: self.refcount.clone(),
+ }
+ }
+
+ fn connection_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.connection_cancel_handle.clone()
+ }
+
+ fn listen_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.listen_cancel_handle.clone()
}
}
@@ -862,14 +944,17 @@ impl Resource for HttpJoinHandle {
}
fn close(self: Rc<Self>) {
- self.1.cancel()
+ // During a close operation, we cancel everything
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
impl Drop for HttpJoinHandle {
fn drop(&mut self) {
// In some cases we may be dropped without closing, so let's cancel everything on the way out
- self.1.cancel();
+ self.connection_cancel_handle.cancel();
+ self.listen_cancel_handle.cancel();
}
}
@@ -890,23 +975,21 @@ where
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
- let cancel_clone = resource.cancel_handle();
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
+ let listen_cancel_clone = resource.listen_cancel_handle();
+
+ let lifetime = resource.lifetime();
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
- .try_or_cancel(cancel_clone.clone())
+ .try_or_cancel(listen_cancel_clone.clone())
.await?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
- cancel_clone.clone(),
+ lifetime.clone(),
tx.clone(),
);
}
@@ -915,7 +998,7 @@ where
});
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -943,22 +1026,18 @@ where
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
- let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
- AsyncRefCell::new(None),
- CancelHandle::new_rc(),
- AsyncRefCell::new(rx),
- ));
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>(
connection,
&listen_properties,
- resource.cancel_handle(),
+ resource.lifetime(),
tx,
);
// Set the handle after we start the future
- *RcRef::map(&resource, |this| &this.0)
+ *RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
@@ -981,7 +1060,7 @@ pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
// If join handle is somehow locked, just abort.
let Some(mut handle) =
- RcRef::map(&join_handle, |this| &this.2).try_borrow_mut()
+ RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
return SlabId::MAX;
};
@@ -1006,9 +1085,9 @@ pub async fn op_http_wait(
.resource_table
.get::<HttpJoinHandle>(rid)?;
- let cancel = join_handle.cancel_handle();
+ let cancel = join_handle.listen_cancel_handle();
let next = async {
- let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
+ let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await;
recv.recv().await
}
.or_cancel(cancel)
@@ -1021,19 +1100,13 @@ pub async fn op_http_wait(
}
// No - we're shutting down
- let res = RcRef::map(join_handle, |this| &this.0)
+ let res = RcRef::map(join_handle, |this| &this.join_handle)
.borrow_mut()
.await
.take()
.unwrap()
.await?;
- // Drop the cancel and join handles
- state
- .borrow_mut()
- .resource_table
- .take::<HttpJoinHandle>(rid)?;
-
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
if let Some(err) = err.source() {
@@ -1049,6 +1122,63 @@ pub async fn op_http_wait(
Ok(SlabId::MAX)
}
+/// Cancels the HTTP handle.
+#[op2(fast)]
+pub fn op_http_cancel(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ Ok(())
+}
+
+#[op2(async)]
+pub async fn op_http_close(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ graceful: bool,
+) -> Result<(), AnyError> {
+ let join_handle = state
+ .borrow_mut()
+ .resource_table
+ .take::<HttpJoinHandle>(rid)?;
+
+ if graceful {
+ deno_net::check_unstable2(&state, "Deno.Server.shutdown");
+ // In a graceful shutdown, we close the listener and allow all the remaining connections to drain
+ join_handle.listen_cancel_handle().cancel();
+ } else {
+ // In a forceful shutdown, we close everything
+ join_handle.listen_cancel_handle().cancel();
+ join_handle.connection_cancel_handle().cancel();
+ }
+
+ // Async spin on the refcount while we wait for everything to drain
+ while Rc::strong_count(&join_handle.refcount.0) > 1 {
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+
+ let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
+ .borrow_mut()
+ .await;
+ if let Some(join_handle) = join_handle.take() {
+ join_handle.await??;
+ }
+
+ Ok(())
+}
+
struct UpgradeStream {
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,