summaryrefslogtreecommitdiff
path: root/extensions/fetch
diff options
context:
space:
mode:
authorJimmy Wärting <jimmy@warting.se>2021-07-05 15:34:37 +0200
committerGitHub <noreply@github.com>2021-07-05 15:34:37 +0200
commit2c0b0e45b72ef1b5d7fa95e1e110d07ddbc720f7 (patch)
tree3b54a6f1f156f8d105cf41ac290035c8b5f8f1c9 /extensions/fetch
parentea87d860beda7cd40eb6857199a00e5ba8700fd2 (diff)
refactor: asynchronous blob backing store (#10969)
Co-authored-by: Luca Casonato <hello@lcas.dev>
Diffstat (limited to 'extensions/fetch')
-rw-r--r--extensions/fetch/26_fetch.js14
-rw-r--r--extensions/fetch/lib.rs66
2 files changed, 58 insertions, 22 deletions
diff --git a/extensions/fetch/26_fetch.js b/extensions/fetch/26_fetch.js
index 47b07be0b..a33187344 100644
--- a/extensions/fetch/26_fetch.js
+++ b/extensions/fetch/26_fetch.js
@@ -152,15 +152,18 @@
if (req.body !== null) {
if (req.body.streamOrStatic instanceof ReadableStream) {
- if (req.body.length === null) {
+ if (req.body.length === null || req.body.source instanceof Blob) {
reqBody = req.body.stream;
} else {
const reader = req.body.stream.getReader();
const r1 = await reader.read();
- if (r1.done) throw new TypeError("Unreachable");
- reqBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
+ if (r1.done) {
+ reqBody = new Uint8Array(0);
+ } else {
+ reqBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
}
} else {
req.body.streamOrStatic.consumed = true;
@@ -174,6 +177,7 @@
headers: req.headerList,
clientRid: req.clientRid,
hasBody: reqBody !== null,
+ bodyLength: req.body?.length,
}, reqBody instanceof Uint8Array ? reqBody : null);
function onAbort() {
diff --git a/extensions/fetch/lib.rs b/extensions/fetch/lib.rs
index 68fe7a20b..f870c58dc 100644
--- a/extensions/fetch/lib.rs
+++ b/extensions/fetch/lib.rs
@@ -26,7 +26,8 @@ use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use data_url::DataUrl;
-use deno_web::BlobUrlStore;
+use deno_web::BlobStore;
+use http::header::CONTENT_LENGTH;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@@ -130,6 +131,7 @@ pub struct FetchArgs {
headers: Vec<(ByteString, ByteString)>,
client_rid: Option<u32>,
has_body: bool,
+ body_length: Option<u64>,
}
#[derive(Serialize)]
@@ -176,6 +178,14 @@ where
None => {
// If no body is passed, we return a writer for streaming the body.
let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
+
+ // If the size of the body is known, we include a content-length
+ // header explicitly.
+ if let Some(body_size) = args.body_length {
+ request =
+ request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
+ }
+
request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
let request_body_rid =
@@ -207,7 +217,13 @@ where
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();
- let fut = async move { request.send().or_cancel(cancel_handle_).await };
+ let fut = async move {
+ request
+ .send()
+ .or_cancel(cancel_handle_)
+ .await
+ .map(|res| res.map_err(|err| type_error(err.to_string())))
+ };
let request_rid = state
.resource_table
@@ -240,32 +256,49 @@ where
(request_rid, None, None)
}
"blob" => {
- let blob_url_storage =
- state.try_borrow::<BlobUrlStore>().ok_or_else(|| {
- type_error("Blob URLs are not supported in this context.")
- })?;
+ let blob_store = state.try_borrow::<BlobStore>().ok_or_else(|| {
+ type_error("Blob URLs are not supported in this context.")
+ })?;
- let blob = blob_url_storage
- .get(url)?
+ let blob = blob_store
+ .get_object_url(url)?
.ok_or_else(|| type_error("Blob for the given URL not found."))?;
if method != "GET" {
return Err(type_error("Blob URL fetch only supports GET method."));
}
- let response = http::Response::builder()
- .status(http::StatusCode::OK)
- .header(http::header::CONTENT_LENGTH, blob.data.len())
- .header(http::header::CONTENT_TYPE, blob.media_type)
- .body(reqwest::Body::from(blob.data))?;
+ let cancel_handle = CancelHandle::new_rc();
+ let cancel_handle_ = cancel_handle.clone();
- let fut = async move { Ok(Ok(Response::from(response))) };
+ let fut = async move {
+ // TODO(lucacsonato): this should be a stream!
+ let chunk = match blob.read_all().or_cancel(cancel_handle_).await? {
+ Ok(chunk) => chunk,
+ Err(err) => return Ok(Err(err)),
+ };
+
+ let res = http::Response::builder()
+ .status(http::StatusCode::OK)
+ .header(http::header::CONTENT_LENGTH, chunk.len())
+ .header(http::header::CONTENT_TYPE, blob.media_type.clone())
+ .body(reqwest::Body::from(chunk))
+ .map_err(|err| type_error(err.to_string()));
+
+ match res {
+ Ok(response) => Ok(Ok(Response::from(response))),
+ Err(err) => Ok(Err(err)),
+ }
+ };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
- (request_rid, None, None)
+ let cancel_handle_rid =
+ state.resource_table.add(FetchCancelHandle(cancel_handle));
+
+ (request_rid, None, Some(cancel_handle_rid))
}
_ => return Err(type_error(format!("scheme '{}' not supported", scheme))),
};
@@ -382,8 +415,7 @@ pub async fn op_fetch_response_read(
Ok(read)
}
-type CancelableResponseResult =
- Result<Result<Response, reqwest::Error>, Canceled>;
+type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;
struct FetchRequestResource(
Pin<Box<dyn Future<Output = CancelableResponseResult>>>,