From e450c6b7cb23be95862d48ef047490490391c9d9 Mon Sep 17 00:00:00 2001 From: Satya Rohith Date: Thu, 23 May 2024 23:01:20 +0530 Subject: fix(ext/node): return cancelled flag in get_response_body_chunk op (#23962) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The flag lets us exit from read loop without throwing an error when the stream is cancelled. This fixes gRPC cancellation example. Co-authored-by: Bartek IwaƄczuk --- ext/node/ops/http2.rs | 19 +++++++++++++++---- ext/node/polyfills/http2.ts | 11 ++++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) (limited to 'ext') diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs index e0de8e474..d51da3b43 100644 --- a/ext/node/ops/http2.rs +++ b/ext/node/ops/http2.rs @@ -24,6 +24,7 @@ use deno_core::ResourceId; use deno_net::raw::take_network_stream_resource; use deno_net::raw::NetworkStream; use h2; +use h2::Reason; use h2::RecvStream; use http_v02; use http_v02::request::Parts; @@ -496,7 +497,7 @@ fn poll_data_or_trailers( pub async fn op_http2_client_get_response_body_chunk( state: Rc>, #[smi] body_rid: ResourceId, -) -> Result<(Option>, bool), AnyError> { +) -> Result<(Option>, bool, bool), AnyError> { let resource = state .borrow() .resource_table @@ -504,9 +505,19 @@ pub async fn op_http2_client_get_response_body_chunk( let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; loop { - match poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await? { + let result = poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await; + if let Err(err) = result { + let reason = err.reason(); + if let Some(reason) = reason { + if reason == Reason::CANCEL { + return Ok((None, false, true)); + } + } + return Err(err.into()); + } + match result.unwrap() { DataOrTrailers::Data(data) => { - return Ok((Some(data.to_vec()), false)); + return Ok((Some(data.to_vec()), false, false)); } DataOrTrailers::Trailers(trailers) => { if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx) @@ -524,7 +535,7 @@ pub async fn op_http2_client_get_response_body_chunk( .borrow_mut() .await .take(); - return Ok((None, true)); + return Ok((None, true, false)); } }; } diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 59756dd0f..c47e54d1b 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -1007,9 +1007,14 @@ export class ClientHttp2Stream extends Duplex { debugHttp2(">>> read"); (async () => { - const [chunk, finished] = await op_http2_client_get_response_body_chunk( - this[kDenoResponse].bodyRid, - ); + const [chunk, finished, cancelled] = + await op_http2_client_get_response_body_chunk( + this[kDenoResponse].bodyRid, + ); + + if (cancelled) { + return; + } debugHttp2(">>> chunk", chunk, finished, this[kDenoResponse].bodyRid); if (chunk === null) { -- cgit v1.2.3