diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/http/http_next.rs | 73 |
1 files changed, 39 insertions, 34 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 593a9c816..5ed443142 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -625,83 +625,80 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> { fn serve_http11_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, - cancel: RcRef<CancelHandle>, ) -> impl Future<Output = Result<(), AnyError>> + 'static { let conn = http1::Builder::new() .keep_alive(true) .serve_connection(io, svc); - conn - .with_upgrades() - .map_err(AnyError::from) - .try_or_cancel(cancel) + conn.with_upgrades().map_err(AnyError::from) } fn serve_http2_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, - cancel: RcRef<CancelHandle>, ) -> impl Future<Output = Result<(), AnyError>> + 'static { let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc); - conn.map_err(AnyError::from).try_or_cancel(cancel) + conn.map_err(AnyError::from) } async fn serve_http2_autodetect( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, - cancel: RcRef<CancelHandle>, ) -> Result<(), AnyError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let (matches, io) = prefix.match_prefix().await?; if matches { - serve_http2_unconditional(io, svc, cancel).await + serve_http2_unconditional(io, svc).await } else { - serve_http11_unconditional(io, svc, cancel).await + serve_http11_unconditional(io, svc).await } } fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, - cancel: RcRef<CancelHandle>, + cancel: Rc<CancelHandle>, tx: tokio::sync::mpsc::Sender<u32>, ) -> JoinHandle<Result<(), AnyError>> { // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local(async { - io.handshake().await?; - // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect - // based on the prefix bytes - let handshake = io.get_ref().1.alpn_protocol(); - if handshake == Some(TLS_ALPN_HTTP_2) { - serve_http2_unconditional(io, svc, cancel).await - } else if handshake == Some(TLS_ALPN_HTTP_11) { - serve_http11_unconditional(io, svc, cancel).await - } else { - serve_http2_autodetect(io, svc, cancel).await + spawn_local( + async { + io.handshake().await?; + // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect + // based on the prefix bytes + let handshake = io.get_ref().1.alpn_protocol(); + if handshake == Some(TLS_ALPN_HTTP_2) { + serve_http2_unconditional(io, svc).await + } else if handshake == Some(TLS_ALPN_HTTP_11) { + serve_http11_unconditional(io, svc).await + } else { + serve_http2_autodetect(io, svc).await + } } - }) + .try_or_cancel(cancel), + ) } fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, - cancel: RcRef<CancelHandle>, + cancel: Rc<CancelHandle>, tx: tokio::sync::mpsc::Sender<u32>, ) -> JoinHandle<Result<(), AnyError>> { // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local(serve_http2_autodetect(io, svc, cancel)) + spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) } fn serve_http_on( network_stream: NetworkStream, listen_properties: &HttpListenProperties, - cancel: RcRef<CancelHandle>, + cancel: Rc<CancelHandle>, tx: tokio::sync::mpsc::Sender<u32>, ) -> JoinHandle<Result<(), AnyError>> { // We always want some sort of peer address. If we can't get one, just make up one. @@ -733,13 +730,14 @@ fn serve_http_on( struct HttpJoinHandle( AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, - CancelHandle, + // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd + Rc<CancelHandle>, AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>, ); impl HttpJoinHandle { - fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { - RcRef::map(self, |this| &this.1) + fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> { + self.1.clone() } } @@ -753,6 +751,13 @@ impl Resource for HttpJoinHandle { } } +impl Drop for HttpJoinHandle { + fn drop(&mut self) { + // In some cases we may be dropped without closing, so let's cancel everything on the way out + self.1.cancel(); + } +} + #[op(v8)] pub fn op_serve_http( state: Rc<RefCell<OpState>>, @@ -773,12 +778,12 @@ pub fn op_serve_http( let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( AsyncRefCell::new(None), - CancelHandle::new(), + CancelHandle::new_rc(), AsyncRefCell::new(rx), )); let cancel_clone = resource.cancel_handle(); - let listen_properties_clone = listen_properties.clone(); + let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let handle = spawn_local(async move { loop { let conn = listener @@ -813,7 +818,7 @@ pub fn op_serve_http_on( state: Rc<RefCell<OpState>>, conn: ResourceId, ) -> Result<(ResourceId, &'static str, String), AnyError> { - let network_stream = + let network_stream: NetworkStream = DefaultHttpRequestProperties::get_network_stream_for_rid( &mut state.borrow_mut(), conn, @@ -828,7 +833,7 @@ pub fn op_serve_http_on( let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( AsyncRefCell::new(None), - CancelHandle::new(), + CancelHandle::new_rc(), AsyncRefCell::new(rx), )); @@ -862,7 +867,7 @@ pub async fn op_http_wait( .resource_table .get::<HttpJoinHandle>(rid)?; - let cancel = join_handle.clone().cancel_handle(); + let cancel = join_handle.cancel_handle(); let next = async { let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await; recv.recv().await |