summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/http/http_next.rs73
1 files changed, 39 insertions, 34 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 593a9c816..5ed443142 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -625,83 +625,80 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
- cancel: RcRef<CancelHandle>,
) -> impl Future<Output = Result<(), AnyError>> + 'static {
let conn = http1::Builder::new()
.keep_alive(true)
.serve_connection(io, svc);
- conn
- .with_upgrades()
- .map_err(AnyError::from)
- .try_or_cancel(cancel)
+ conn.with_upgrades().map_err(AnyError::from)
}
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
- cancel: RcRef<CancelHandle>,
) -> impl Future<Output = Result<(), AnyError>> + 'static {
let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc);
- conn.map_err(AnyError::from).try_or_cancel(cancel)
+ conn.map_err(AnyError::from)
}
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
- cancel: RcRef<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc, cancel).await
+ serve_http2_unconditional(io, svc).await
} else {
- serve_http11_unconditional(io, svc, cancel).await
+ serve_http11_unconditional(io, svc).await
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
- cancel: RcRef<CancelHandle>,
+ cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
});
- spawn_local(async {
- io.handshake().await?;
- // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
- // based on the prefix bytes
- let handshake = io.get_ref().1.alpn_protocol();
- if handshake == Some(TLS_ALPN_HTTP_2) {
- serve_http2_unconditional(io, svc, cancel).await
- } else if handshake == Some(TLS_ALPN_HTTP_11) {
- serve_http11_unconditional(io, svc, cancel).await
- } else {
- serve_http2_autodetect(io, svc, cancel).await
+ spawn_local(
+ async {
+ io.handshake().await?;
+ // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
+ // based on the prefix bytes
+ let handshake = io.get_ref().1.alpn_protocol();
+ if handshake == Some(TLS_ALPN_HTTP_2) {
+ serve_http2_unconditional(io, svc).await
+ } else if handshake == Some(TLS_ALPN_HTTP_11) {
+ serve_http11_unconditional(io, svc).await
+ } else {
+ serve_http2_autodetect(io, svc).await
+ }
}
- })
+ .try_or_cancel(cancel),
+ )
}
fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
- cancel: RcRef<CancelHandle>,
+ cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
});
- spawn_local(serve_http2_autodetect(io, svc, cancel))
+ spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
}
fn serve_http_on(
network_stream: NetworkStream,
listen_properties: &HttpListenProperties,
- cancel: RcRef<CancelHandle>,
+ cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// We always want some sort of peer address. If we can't get one, just make up one.
@@ -733,13 +730,14 @@ fn serve_http_on(
struct HttpJoinHandle(
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
- CancelHandle,
+ // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
+ Rc<CancelHandle>,
AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>,
);
impl HttpJoinHandle {
- fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
- RcRef::map(self, |this| &this.1)
+ fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.1.clone()
}
}
@@ -753,6 +751,13 @@ impl Resource for HttpJoinHandle {
}
}
+impl Drop for HttpJoinHandle {
+ fn drop(&mut self) {
+ // In some cases we may be dropped without closing, so let's cancel everything on the way out
+ self.1.cancel();
+ }
+}
+
#[op(v8)]
pub fn op_serve_http(
state: Rc<RefCell<OpState>>,
@@ -773,12 +778,12 @@ pub fn op_serve_http(
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None),
- CancelHandle::new(),
+ CancelHandle::new_rc(),
AsyncRefCell::new(rx),
));
let cancel_clone = resource.cancel_handle();
- let listen_properties_clone = listen_properties.clone();
+ let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn_local(async move {
loop {
let conn = listener
@@ -813,7 +818,7 @@ pub fn op_serve_http_on(
state: Rc<RefCell<OpState>>,
conn: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> {
- let network_stream =
+ let network_stream: NetworkStream =
DefaultHttpRequestProperties::get_network_stream_for_rid(
&mut state.borrow_mut(),
conn,
@@ -828,7 +833,7 @@ pub fn op_serve_http_on(
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None),
- CancelHandle::new(),
+ CancelHandle::new_rc(),
AsyncRefCell::new(rx),
));
@@ -862,7 +867,7 @@ pub async fn op_http_wait(
.resource_table
.get::<HttpJoinHandle>(rid)?;
- let cancel = join_handle.clone().cancel_handle();
+ let cancel = join_handle.cancel_handle();
let next = async {
let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
recv.recv().await