From 5457e741fae8272901d277836a396a52fada86da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 4 Jun 2021 01:32:36 +0200 Subject: fix: hang in op_http_next_request (#10836) This commit adds "CancelHandle" to "ConnResource" and changes "op_http_next_request" to await for the cancel signal. In turn when async iterating over "Deno.HttpConn" the iterator breaks upon closing of the resource. --- runtime/ops/http.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'runtime/ops') diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index e4ba2db2a..fedcb404f 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -14,7 +14,6 @@ use deno_core::futures::StreamExt; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; -use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -107,6 +106,7 @@ struct ConnResource { hyper_connection: ConnType, deno_service: Service, addr: SocketAddr, + cancel: CancelHandle, } impl ConnResource { @@ -124,6 +124,10 @@ impl Resource for ConnResource { fn name(&self) -> Cow { "httpConnection".into() } + + fn close(self: Rc) { + self.cancel.cancel() + } } // We use a tuple instead of struct to avoid serialization overhead of the keys. @@ -153,6 +157,8 @@ async fn op_http_request_next( .get::(conn_rid) .ok_or_else(bad_resource_id)?; + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + poll_fn(|cx| { let connection_closed = match conn_resource.poll(cx) { Poll::Pending => false, @@ -257,6 +263,7 @@ async fn op_http_request_next( Poll::Pending } }) + .try_or_cancel(cancel) .await .map_err(AnyError::from) } @@ -298,6 +305,7 @@ fn op_http_start( hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), deno_service, addr, + cancel: CancelHandle::default(), }; let rid = state.resource_table.add(conn_resource); return Ok(rid); @@ -320,6 +328,7 @@ fn op_http_start( hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), deno_service, addr, + cancel: CancelHandle::default(), }; let rid = state.resource_table.add(conn_resource); return Ok(rid); @@ -381,7 +390,6 @@ async fn op_http_response( let response_body_rid = state.borrow_mut().resource_table.add(ResponseBodyResource { body: AsyncRefCell::new(sender), - cancel: CancelHandle::default(), conn_rid: response_sender.conn_rid, }); @@ -484,12 +492,8 @@ async fn op_http_response_write( .ok_or_else(bad_resource_id)?; let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut send_data_fut = body - .send_data(Vec::from(&*buf).into()) - .or_cancel(cancel) - .boxed_local(); + let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); poll_fn(|cx| { if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { @@ -501,8 +505,7 @@ async fn op_http_response_write( send_data_fut.poll_unpin(cx).map_err(AnyError::from) }) - .await? - .unwrap(); // panic on send_data error + .await?; Ok(()) } @@ -520,6 +523,10 @@ impl Resource for RequestBodyResource { fn name(&self) -> Cow { "requestBody".into() } + + fn close(self: Rc) { + self.cancel.cancel() + } } struct ResponseSenderResource { @@ -535,7 +542,6 @@ impl Resource for ResponseSenderResource { struct ResponseBodyResource { body: AsyncRefCell, - cancel: CancelHandle, conn_rid: ResourceId, } -- cgit v1.2.3