diff options
Diffstat (limited to 'runtime/ops/http.rs')
-rw-r--r-- | runtime/ops/http.rs | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index e4ba2db2a..fedcb404f 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -14,7 +14,6 @@ use deno_core::futures::StreamExt; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; -use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -107,6 +106,7 @@ struct ConnResource { hyper_connection: ConnType, deno_service: Service, addr: SocketAddr, + cancel: CancelHandle, } impl ConnResource { @@ -124,6 +124,10 @@ impl Resource for ConnResource { fn name(&self) -> Cow<str> { "httpConnection".into() } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } } // We use a tuple instead of struct to avoid serialization overhead of the keys. @@ -153,6 +157,8 @@ async fn op_http_request_next( .get::<ConnResource>(conn_rid) .ok_or_else(bad_resource_id)?; + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + poll_fn(|cx| { let connection_closed = match conn_resource.poll(cx) { Poll::Pending => false, @@ -257,6 +263,7 @@ async fn op_http_request_next( Poll::Pending } }) + .try_or_cancel(cancel) .await .map_err(AnyError::from) } @@ -298,6 +305,7 @@ fn op_http_start( hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), deno_service, addr, + cancel: CancelHandle::default(), }; let rid = state.resource_table.add(conn_resource); return Ok(rid); @@ -320,6 +328,7 @@ fn op_http_start( hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), deno_service, addr, + cancel: CancelHandle::default(), }; let rid = state.resource_table.add(conn_resource); return Ok(rid); @@ -381,7 +390,6 @@ async fn op_http_response( let response_body_rid = state.borrow_mut().resource_table.add(ResponseBodyResource { body: AsyncRefCell::new(sender), - cancel: CancelHandle::default(), conn_rid: response_sender.conn_rid, }); @@ -484,12 +492,8 @@ async fn op_http_response_write( .ok_or_else(bad_resource_id)?; let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut send_data_fut = body - .send_data(Vec::from(&*buf).into()) - .or_cancel(cancel) - .boxed_local(); + let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); poll_fn(|cx| { if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { @@ -501,8 +505,7 @@ async fn op_http_response_write( send_data_fut.poll_unpin(cx).map_err(AnyError::from) }) - .await? - .unwrap(); // panic on send_data error + .await?; Ok(()) } @@ -520,6 +523,10 @@ impl Resource for RequestBodyResource { fn name(&self) -> Cow<str> { "requestBody".into() } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } } struct ResponseSenderResource { @@ -535,7 +542,6 @@ impl Resource for ResponseSenderResource { struct ResponseBodyResource { body: AsyncRefCell<hyper::body::Sender>, - cancel: CancelHandle, conn_rid: ResourceId, } |