summaryrefslogtreecommitdiff
path: root/extensions/fetch/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/fetch/lib.rs')
-rw-r--r--extensions/fetch/lib.rs53
1 files changed, 43 insertions, 10 deletions
diff --git a/extensions/fetch/lib.rs b/extensions/fetch/lib.rs
index 0ac6422e4..2fbd38b3a 100644
--- a/extensions/fetch/lib.rs
+++ b/extensions/fetch/lib.rs
@@ -16,6 +16,7 @@ use deno_core::AsyncRefCell;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
+use deno_core::Canceled;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
@@ -131,6 +132,7 @@ pub struct FetchArgs {
pub struct FetchReturn {
request_rid: ResourceId,
request_body_rid: Option<ResourceId>,
+ cancel_handle_rid: Option<ResourceId>,
}
pub fn op_fetch<FP>(
@@ -157,7 +159,7 @@ where
// Check scheme before asking for net permission
let scheme = url.scheme();
- let (request_rid, request_body_rid) = match scheme {
+ let (request_rid, request_body_rid, cancel_handle_rid) = match scheme {
"http" | "https" => {
let permissions = state.borrow_mut::<FP>();
permissions.check_net_url(&url)?;
@@ -195,13 +197,19 @@ where
request = request.header(name, v);
}
- let fut = request.send();
+ let cancel_handle = CancelHandle::new_rc();
+ let cancel_handle_ = cancel_handle.clone();
+
+ let fut = async move { request.send().or_cancel(cancel_handle_).await };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
- (request_rid, request_body_rid)
+ let cancel_handle_rid =
+ state.resource_table.add(FetchCancelHandle(cancel_handle));
+
+ (request_rid, request_body_rid, Some(cancel_handle_rid))
}
"data" => {
let data_url = DataUrl::process(url.as_str())
@@ -216,13 +224,13 @@ where
.header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
.body(reqwest::Body::from(body))?;
- let fut = async move { Ok(Response::from(response)) };
+ let fut = async move { Ok(Ok(Response::from(response))) };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
- (request_rid, None)
+ (request_rid, None, None)
}
"blob" => {
let blob_url_storage =
@@ -244,13 +252,13 @@ where
.header(http::header::CONTENT_TYPE, blob.media_type)
.body(reqwest::Body::from(blob.data))?;
- let fut = async move { Ok(Response::from(response)) };
+ let fut = async move { Ok(Ok(Response::from(response))) };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
- (request_rid, None)
+ (request_rid, None, None)
}
_ => return Err(type_error(format!("scheme '{}' not supported", scheme))),
};
@@ -258,6 +266,7 @@ where
Ok(FetchReturn {
request_rid,
request_body_rid,
+ cancel_handle_rid,
})
}
@@ -287,8 +296,9 @@ pub async fn op_fetch_send(
.expect("multiple op_fetch_send ongoing");
let res = match request.0.await {
- Ok(res) => res,
- Err(e) => return Err(type_error(e.to_string())),
+ Ok(Ok(res)) => res,
+ Ok(Err(err)) => return Err(type_error(err.to_string())),
+ Err(_) => return Err(type_error("request was cancelled")),
};
//debug!("Fetch response {}", url);
@@ -372,8 +382,11 @@ pub async fn op_fetch_response_read(
Ok(read)
}
+type CancelableResponseResult =
+ Result<Result<Response, reqwest::Error>, Canceled>;
+
struct FetchRequestResource(
- Pin<Box<dyn Future<Output = Result<Response, reqwest::Error>>>>,
+ Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
);
impl Resource for FetchRequestResource {
@@ -382,6 +395,18 @@ impl Resource for FetchRequestResource {
}
}
+struct FetchCancelHandle(Rc<CancelHandle>);
+
+impl Resource for FetchCancelHandle {
+ fn name(&self) -> Cow<str> {
+ "fetchCancelHandle".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.0.cancel()
+ }
+}
+
struct FetchRequestBodyResource {
body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>,
cancel: CancelHandle,
@@ -391,6 +416,10 @@ impl Resource for FetchRequestBodyResource {
fn name(&self) -> Cow<str> {
"fetchRequestBody".into()
}
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
}
type BytesStream =
@@ -405,6 +434,10 @@ impl Resource for FetchResponseBodyResource {
fn name(&self) -> Cow<str> {
"fetchResponseBody".into()
}
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
}
struct HttpClientResource {