diff options
Diffstat (limited to 'extensions/fetch')
-rw-r--r-- | extensions/fetch/26_fetch.js | 14 | ||||
-rw-r--r-- | extensions/fetch/lib.rs | 66 |
2 files changed, 58 insertions, 22 deletions
diff --git a/extensions/fetch/26_fetch.js b/extensions/fetch/26_fetch.js index 47b07be0b..a33187344 100644 --- a/extensions/fetch/26_fetch.js +++ b/extensions/fetch/26_fetch.js @@ -152,15 +152,18 @@ if (req.body !== null) { if (req.body.streamOrStatic instanceof ReadableStream) { - if (req.body.length === null) { + if (req.body.length === null || req.body.source instanceof Blob) { reqBody = req.body.stream; } else { const reader = req.body.stream.getReader(); const r1 = await reader.read(); - if (r1.done) throw new TypeError("Unreachable"); - reqBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); + if (r1.done) { + reqBody = new Uint8Array(0); + } else { + reqBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } } } else { req.body.streamOrStatic.consumed = true; @@ -174,6 +177,7 @@ headers: req.headerList, clientRid: req.clientRid, hasBody: reqBody !== null, + bodyLength: req.body?.length, }, reqBody instanceof Uint8Array ? reqBody : null); function onAbort() { diff --git a/extensions/fetch/lib.rs b/extensions/fetch/lib.rs index 68fe7a20b..f870c58dc 100644 --- a/extensions/fetch/lib.rs +++ b/extensions/fetch/lib.rs @@ -26,7 +26,8 @@ use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use data_url::DataUrl; -use deno_web::BlobUrlStore; +use deno_web::BlobStore; +use http::header::CONTENT_LENGTH; use reqwest::header::HeaderMap; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; @@ -130,6 +131,7 @@ pub struct FetchArgs { headers: Vec<(ByteString, ByteString)>, client_rid: Option<u32>, has_body: bool, + body_length: Option<u64>, } #[derive(Serialize)] @@ -176,6 +178,14 @@ where None => { // If no body is passed, we return a writer for streaming the body. let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); + + // If the size of the body is known, we include a content-length + // header explicitly. + if let Some(body_size) = args.body_length { + request = + request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) + } + request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); let request_body_rid = @@ -207,7 +217,13 @@ where let cancel_handle = CancelHandle::new_rc(); let cancel_handle_ = cancel_handle.clone(); - let fut = async move { request.send().or_cancel(cancel_handle_).await }; + let fut = async move { + request + .send() + .or_cancel(cancel_handle_) + .await + .map(|res| res.map_err(|err| type_error(err.to_string()))) + }; let request_rid = state .resource_table @@ -240,32 +256,49 @@ where (request_rid, None, None) } "blob" => { - let blob_url_storage = - state.try_borrow::<BlobUrlStore>().ok_or_else(|| { - type_error("Blob URLs are not supported in this context.") - })?; + let blob_store = state.try_borrow::<BlobStore>().ok_or_else(|| { + type_error("Blob URLs are not supported in this context.") + })?; - let blob = blob_url_storage - .get(url)? + let blob = blob_store + .get_object_url(url)? .ok_or_else(|| type_error("Blob for the given URL not found."))?; if method != "GET" { return Err(type_error("Blob URL fetch only supports GET method.")); } - let response = http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_LENGTH, blob.data.len()) - .header(http::header::CONTENT_TYPE, blob.media_type) - .body(reqwest::Body::from(blob.data))?; + let cancel_handle = CancelHandle::new_rc(); + let cancel_handle_ = cancel_handle.clone(); - let fut = async move { Ok(Ok(Response::from(response))) }; + let fut = async move { + // TODO(lucacsonato): this should be a stream! + let chunk = match blob.read_all().or_cancel(cancel_handle_).await? { + Ok(chunk) => chunk, + Err(err) => return Ok(Err(err)), + }; + + let res = http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_LENGTH, chunk.len()) + .header(http::header::CONTENT_TYPE, blob.media_type.clone()) + .body(reqwest::Body::from(chunk)) + .map_err(|err| type_error(err.to_string())); + + match res { + Ok(response) => Ok(Ok(Response::from(response))), + Err(err) => Ok(Err(err)), + } + }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None, None) + let cancel_handle_rid = + state.resource_table.add(FetchCancelHandle(cancel_handle)); + + (request_rid, None, Some(cancel_handle_rid)) } _ => return Err(type_error(format!("scheme '{}' not supported", scheme))), }; @@ -382,8 +415,7 @@ pub async fn op_fetch_response_read( Ok(read) } -type CancelableResponseResult = - Result<Result<Response, reqwest::Error>, Canceled>; +type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>; struct FetchRequestResource( Pin<Box<dyn Future<Output = CancelableResponseResult>>>, |