diff options
Diffstat (limited to 'extensions/fetch/lib.rs')
-rw-r--r-- | extensions/fetch/lib.rs | 53 |
1 files changed, 43 insertions, 10 deletions
diff --git a/extensions/fetch/lib.rs b/extensions/fetch/lib.rs index 0ac6422e4..2fbd38b3a 100644 --- a/extensions/fetch/lib.rs +++ b/extensions/fetch/lib.rs @@ -16,6 +16,7 @@ use deno_core::AsyncRefCell; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; +use deno_core::Canceled; use deno_core::Extension; use deno_core::OpState; use deno_core::RcRef; @@ -131,6 +132,7 @@ pub struct FetchArgs { pub struct FetchReturn { request_rid: ResourceId, request_body_rid: Option<ResourceId>, + cancel_handle_rid: Option<ResourceId>, } pub fn op_fetch<FP>( @@ -157,7 +159,7 @@ where // Check scheme before asking for net permission let scheme = url.scheme(); - let (request_rid, request_body_rid) = match scheme { + let (request_rid, request_body_rid, cancel_handle_rid) = match scheme { "http" | "https" => { let permissions = state.borrow_mut::<FP>(); permissions.check_net_url(&url)?; @@ -195,13 +197,19 @@ where request = request.header(name, v); } - let fut = request.send(); + let cancel_handle = CancelHandle::new_rc(); + let cancel_handle_ = cancel_handle.clone(); + + let fut = async move { request.send().or_cancel(cancel_handle_).await }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, request_body_rid) + let cancel_handle_rid = + state.resource_table.add(FetchCancelHandle(cancel_handle)); + + (request_rid, request_body_rid, Some(cancel_handle_rid)) } "data" => { let data_url = DataUrl::process(url.as_str()) @@ -216,13 +224,13 @@ where .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string()) .body(reqwest::Body::from(body))?; - let fut = async move { Ok(Response::from(response)) }; + let fut = async move { Ok(Ok(Response::from(response))) }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None) + (request_rid, None, None) } "blob" => { let blob_url_storage = @@ -244,13 +252,13 @@ where .header(http::header::CONTENT_TYPE, blob.media_type) .body(reqwest::Body::from(blob.data))?; - let fut = async move { Ok(Response::from(response)) }; + let fut = async move { Ok(Ok(Response::from(response))) }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None) + (request_rid, None, None) } _ => return Err(type_error(format!("scheme '{}' not supported", scheme))), }; @@ -258,6 +266,7 @@ where Ok(FetchReturn { request_rid, request_body_rid, + cancel_handle_rid, }) } @@ -287,8 +296,9 @@ pub async fn op_fetch_send( .expect("multiple op_fetch_send ongoing"); let res = match request.0.await { - Ok(res) => res, - Err(e) => return Err(type_error(e.to_string())), + Ok(Ok(res)) => res, + Ok(Err(err)) => return Err(type_error(err.to_string())), + Err(_) => return Err(type_error("request was cancelled")), }; //debug!("Fetch response {}", url); @@ -372,8 +382,11 @@ pub async fn op_fetch_response_read( Ok(read) } +type CancelableResponseResult = + Result<Result<Response, reqwest::Error>, Canceled>; + struct FetchRequestResource( - Pin<Box<dyn Future<Output = Result<Response, reqwest::Error>>>>, + Pin<Box<dyn Future<Output = CancelableResponseResult>>>, ); impl Resource for FetchRequestResource { @@ -382,6 +395,18 @@ impl Resource for FetchRequestResource { } } +struct FetchCancelHandle(Rc<CancelHandle>); + +impl Resource for FetchCancelHandle { + fn name(&self) -> Cow<str> { + "fetchCancelHandle".into() + } + + fn close(self: Rc<Self>) { + self.0.cancel() + } +} + struct FetchRequestBodyResource { body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>, cancel: CancelHandle, @@ -391,6 +416,10 @@ impl Resource for FetchRequestBodyResource { fn name(&self) -> Cow<str> { "fetchRequestBody".into() } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } } type BytesStream = @@ -405,6 +434,10 @@ impl Resource for FetchResponseBodyResource { fn name(&self) -> Cow<str> { "fetchResponseBody".into() } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } } struct HttpClientResource { |