diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/fetch | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/fetch')
-rw-r--r-- | ext/fetch/26_fetch.js | 179 | ||||
-rw-r--r-- | ext/fetch/Cargo.toml | 1 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 212 |
3 files changed, 135 insertions, 257 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index e586d9a3a..8a71d9bcf 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -14,11 +14,12 @@ const core = globalThis.Deno.core; const ops = core.ops; import * as webidl from "ext:deno_webidl/00_webidl.js"; import { byteLowerCase } from "ext:deno_web/00_infra.js"; -import { BlobPrototype } from "ext:deno_web/09_file.js"; import { errorReadableStream, + getReadableStreamResourceBacking, readableStreamForRid, ReadableStreamPrototype, + resourceForReadableStream, } from "ext:deno_web/06_streams.js"; import { extractBody, InnerBody } from "ext:deno_fetch/22_body.js"; import { processUrlList, toInnerRequest } from "ext:deno_fetch/23_request.js"; @@ -37,22 +38,17 @@ const { ArrayPrototypeSplice, ArrayPrototypeFilter, ArrayPrototypeIncludes, + Error, ObjectPrototypeIsPrototypeOf, Promise, PromisePrototypeThen, PromisePrototypeCatch, SafeArrayIterator, - SafeWeakMap, String, StringPrototypeStartsWith, StringPrototypeToLowerCase, TypeError, - Uint8Array, Uint8ArrayPrototype, - WeakMapPrototypeDelete, - WeakMapPrototypeGet, - WeakMapPrototypeHas, - WeakMapPrototypeSet, } = primordials; const REQUEST_BODY_HEADER_NAMES = [ @@ -62,28 +58,9 @@ const REQUEST_BODY_HEADER_NAMES = [ "content-type", ]; -const requestBodyReaders = new SafeWeakMap(); - -/** - * @param {{ method: string, url: string, headers: [string, string][], clientRid: number | null, hasBody: boolean }} args - * @param {Uint8Array | null} body - * @returns {{ requestRid: number, requestBodyRid: number | null, cancelHandleRid: number | null }} - */ -function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) { - return ops.op_fetch( - method, - url, - headers, - clientRid, - hasBody, - bodyLength, - body, - ); -} - /** * @param {number} rid - * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>} + * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number, error: string? }>} */ function opFetchSend(rid) { return core.opAsync("op_fetch_send", rid); @@ -145,154 +122,59 @@ async function mainFetch(req, recursive, terminator) { /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ let reqBody = null; - - if (req.body !== null) { - if ( - ObjectPrototypeIsPrototypeOf( - ReadableStreamPrototype, - req.body.streamOrStatic, - ) - ) { - if ( - req.body.length === null || - ObjectPrototypeIsPrototypeOf(BlobPrototype, req.body.source) - ) { - reqBody = req.body.stream; + let reqRid = null; + + if (req.body) { + const stream = req.body.streamOrStatic; + const body = stream.body; + + if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { + reqBody = body; + } else if (typeof body === "string") { + reqBody = core.encode(body); + } else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { + const resourceBacking = getReadableStreamResourceBacking(stream); + if (resourceBacking) { + reqRid = resourceBacking.rid; } else { - const reader = req.body.stream.getReader(); - WeakMapPrototypeSet(requestBodyReaders, req, reader); - const r1 = await reader.read(); - if (r1.done) { - reqBody = new Uint8Array(0); - } else { - reqBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - WeakMapPrototypeDelete(requestBodyReaders, req); + reqRid = resourceForReadableStream(stream, req.body.length); } } else { - req.body.streamOrStatic.consumed = true; - reqBody = req.body.streamOrStatic.body; - // TODO(@AaronO): plumb support for StringOrBuffer all the way - reqBody = typeof reqBody === "string" ? core.encode(reqBody) : reqBody; + throw TypeError("invalid body"); } } - const { requestRid, requestBodyRid, cancelHandleRid } = opFetch( + const { requestRid, cancelHandleRid } = ops.op_fetch( req.method, req.currentUrl(), req.headerList, req.clientRid, - reqBody !== null, - req.body?.length, - ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, reqBody) ? reqBody : null, + reqBody !== null || reqRid !== null, + reqBody, + reqRid, ); function onAbort() { if (cancelHandleRid !== null) { core.tryClose(cancelHandleRid); } - if (requestBodyRid !== null) { - core.tryClose(requestBodyRid); - } } terminator[abortSignal.add](onAbort); - - let requestSendError; - let requestSendErrorSet = false; - - async function propagateError(err, message) { - // TODO(lucacasonato): propagate error into response body stream - try { - await core.writeTypeError(requestBodyRid, message); - } catch (err) { - if (!requestSendErrorSet) { - requestSendErrorSet = true; - requestSendError = err; - } - } - if (!requestSendErrorSet) { - requestSendErrorSet = true; - requestSendError = err; - } - } - - if (requestBodyRid !== null) { - if ( - reqBody === null || - !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, reqBody) - ) { - throw new TypeError("Unreachable"); - } - const reader = reqBody.getReader(); - WeakMapPrototypeSet(requestBodyReaders, req, reader); - (async () => { - let done = false; - while (!done) { - let val; - try { - const res = await reader.read(); - done = res.done; - val = res.value; - } catch (err) { - if (terminator.aborted) break; - await propagateError(err, "failed to read"); - break; - } - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) { - const error = new TypeError( - "Item in request body ReadableStream is not a Uint8Array", - ); - await reader.cancel(error); - await propagateError(error, error.message); - break; - } - try { - await core.writeAll(requestBodyRid, val); - } catch (err) { - if (terminator.aborted) break; - await reader.cancel(err); - await propagateError(err, "failed to write"); - break; - } - } - if (done && !terminator.aborted) { - try { - await core.shutdown(requestBodyRid); - } catch (err) { - if (!terminator.aborted) { - await propagateError(err, "failed to flush"); - } - } - } - WeakMapPrototypeDelete(requestBodyReaders, req); - reader.releaseLock(); - core.tryClose(requestBodyRid); - })(); - } let resp; try { resp = await opFetchSend(requestRid); } catch (err) { if (terminator.aborted) return; - if (requestSendErrorSet) { - // if the request body stream errored, we want to propagate that error - // instead of the original error from opFetchSend - throw new TypeError("Failed to fetch: request body stream errored", { - cause: requestSendError, - }); - } - if (requestBodyRid !== null) { - core.tryClose(requestBodyRid); - } throw err; } finally { if (cancelHandleRid !== null) { core.tryClose(cancelHandleRid); } } + // Re-throw any body errors + if (resp.error) { + throw new TypeError("body failed", { cause: new Error(resp.error) }); + } if (terminator.aborted) return abortedNetworkError(); processUrlList(req.urlList, req.urlListProcessed); @@ -510,9 +392,8 @@ function fetch(input, init = {}) { function abortFetch(request, responseObject, error) { if (request.body !== null) { - if (WeakMapPrototypeHas(requestBodyReaders, request)) { - WeakMapPrototypeGet(requestBodyReaders, request).cancel(error); - } else { + // Cancel the body if we haven't taken it as a resource yet + if (!request.body.streamOrStatic.locked) { request.body.cancel(error); } } diff --git a/ext/fetch/Cargo.toml b/ext/fetch/Cargo.toml index 901e9ce5f..ede514b97 100644 --- a/ext/fetch/Cargo.toml +++ b/ext/fetch/Cargo.toml @@ -20,6 +20,7 @@ deno_core.workspace = true deno_tls.workspace = true dyn-clone = "1" http.workspace = true +pin-project.workspace = true reqwest.workspace = true serde.workspace = true tokio.workspace = true diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 7cde5584f..6e1ecb5e4 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -11,6 +11,8 @@ use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use deno_core::anyhow::Error; use deno_core::error::type_error; @@ -21,13 +23,11 @@ use deno_core::futures::FutureExt; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::op2; -use deno_core::BufView; -use deno_core::WriteOutcome; - use deno_core::unsync::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -62,7 +62,6 @@ use serde::Deserialize; use serde::Serialize; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::sync::mpsc; // Re-export reqwest and data_url pub use data_url; @@ -184,7 +183,6 @@ pub fn get_declaration() -> PathBuf { #[serde(rename_all = "camelCase")] pub struct FetchReturn { pub request_rid: ResourceId, - pub request_body_rid: Option<ResourceId>, pub cancel_handle_rid: Option<ResourceId>, } @@ -216,6 +214,59 @@ pub fn get_or_create_client_from_state( } } +#[allow(clippy::type_complexity)] +pub struct ResourceToBodyAdapter( + Rc<dyn Resource>, + Option<Pin<Box<dyn Future<Output = Result<BufView, Error>>>>>, +); + +impl ResourceToBodyAdapter { + pub fn new(resource: Rc<dyn Resource>) -> Self { + let future = resource.clone().read(64 * 1024); + Self(resource, Some(future)) + } +} + +// SAFETY: we only use this on a single-threaded executor +unsafe impl Send for ResourceToBodyAdapter {} +// SAFETY: we only use this on a single-threaded executor +unsafe impl Sync for ResourceToBodyAdapter {} + +impl Stream for ResourceToBodyAdapter { + type Item = Result<BufView, Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = self.get_mut(); + if let Some(mut fut) = this.1.take() { + match fut.poll_unpin(cx) { + Poll::Pending => { + this.1 = Some(fut); + Poll::Pending + } + Poll::Ready(res) => match res { + Ok(buf) if buf.is_empty() => Poll::Ready(None), + Ok(_) => { + this.1 = Some(this.0.clone().read(64 * 1024)); + Poll::Ready(Some(res)) + } + _ => Poll::Ready(Some(res)), + }, + } + } else { + Poll::Ready(None) + } + } +} + +impl Drop for ResourceToBodyAdapter { + fn drop(&mut self) { + self.0.clone().close() + } +} + #[op2] #[serde] #[allow(clippy::too_many_arguments)] @@ -226,8 +277,8 @@ pub fn op_fetch<FP>( #[serde] headers: Vec<(ByteString, ByteString)>, #[smi] client_rid: Option<u32>, has_body: bool, - #[number] body_length: Option<u64>, #[buffer] data: Option<JsBuffer>, + #[smi] resource: Option<ResourceId>, ) -> Result<FetchReturn, AnyError> where FP: FetchPermissions + 'static, @@ -244,7 +295,7 @@ where // Check scheme before asking for net permission let scheme = url.scheme(); - let (request_rid, request_body_rid, cancel_handle_rid) = match scheme { + let (request_rid, cancel_handle_rid) = match scheme { "file" => { let path = url.to_file_path().map_err(|_| { type_error("NetworkError when attempting to fetch resource.") @@ -268,7 +319,7 @@ where let maybe_cancel_handle_rid = maybe_cancel_handle .map(|ch| state.resource_table.add(FetchCancelHandle(ch))); - (request_rid, None, maybe_cancel_handle_rid) + (request_rid, maybe_cancel_handle_rid) } "http" | "https" => { let permissions = state.borrow_mut::<FP>(); @@ -282,34 +333,25 @@ where let mut request = client.request(method.clone(), url); - let request_body_rid = if has_body { - match data { - None => { - // If no body is passed, we return a writer for streaming the body. - let (tx, stream) = tokio::sync::mpsc::channel(1); - - // If the size of the body is known, we include a content-length - // header explicitly. - if let Some(body_size) = body_length { - request = - request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) - } - - request = request.body(Body::wrap_stream(FetchBodyStream(stream))); - - let request_body_rid = - state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(Some(tx)), - cancel: CancelHandle::default(), - }); - - Some(request_body_rid) - } - Some(data) => { + if has_body { + match (data, resource) { + (Some(data), _) => { // If a body is passed, we use it, and don't return a body for streaming. request = request.body(data.to_vec()); - None } + (_, Some(resource)) => { + let resource = state.resource_table.take_any(resource)?; + match resource.size_hint() { + (body_size, Some(n)) if body_size == n && body_size > 0 => { + request = + request.header(CONTENT_LENGTH, HeaderValue::from(body_size)); + } + _ => {} + } + request = request + .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource))) + } + (None, None) => unreachable!(), } } else { // POST and PUT requests should always have a 0 length content-length, @@ -317,7 +359,6 @@ where if matches!(method, Method::POST | Method::PUT) { request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); } - None }; let mut header_map = HeaderMap::new(); @@ -354,7 +395,7 @@ where .send() .or_cancel(cancel_handle_) .await - .map(|res| res.map_err(|err| type_error(err.to_string()))) + .map(|res| res.map_err(|err| err.into())) }; let request_rid = state @@ -364,7 +405,7 @@ where let cancel_handle_rid = state.resource_table.add(FetchCancelHandle(cancel_handle)); - (request_rid, request_body_rid, Some(cancel_handle_rid)) + (request_rid, Some(cancel_handle_rid)) } "data" => { let data_url = DataUrl::process(url.as_str()) @@ -385,7 +426,7 @@ where .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None, None) + (request_rid, None) } "blob" => { // Blob URL resolution happens in the JS side of fetch. If we got here is @@ -397,12 +438,11 @@ where Ok(FetchReturn { request_rid, - request_body_rid, cancel_handle_rid, }) } -#[derive(Serialize)] +#[derive(Default, Serialize)] #[serde(rename_all = "camelCase")] pub struct FetchResponse { pub status: u16, @@ -413,6 +453,7 @@ pub struct FetchResponse { pub content_length: Option<u64>, pub remote_addr_ip: Option<String>, pub remote_addr_port: Option<u16>, + pub error: Option<String>, } #[op2(async)] @@ -432,7 +473,29 @@ pub async fn op_fetch_send( let res = match request.0.await { Ok(Ok(res)) => res, - Ok(Err(err)) => return Err(type_error(err.to_string())), + Ok(Err(err)) => { + // We're going to try and rescue the error cause from a stream and return it from this fetch. + // If any error in the chain is a reqwest body error, return that as a special result we can use to + // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`). + // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead + let mut err_ref: &dyn std::error::Error = err.as_ref(); + while let Some(err) = std::error::Error::source(err_ref) { + if let Some(err) = err.downcast_ref::<reqwest::Error>() { + if err.is_body() { + // Extracts the next error cause and uses that for the message + if let Some(err) = std::error::Error::source(err) { + return Ok(FetchResponse { + error: Some(err.to_string()), + ..Default::default() + }); + } + } + } + err_ref = err; + } + + return Err(type_error(err.to_string())); + } Err(_) => return Err(type_error("request was cancelled")), }; @@ -465,6 +528,7 @@ pub async fn op_fetch_send( content_length, remote_addr_ip, remote_addr_port, + error: None, }) } @@ -599,74 +663,6 @@ impl Resource for FetchCancelHandle { } } -/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`]. -pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>); - -impl Stream for FetchBodyStream { - type Item = Result<bytes::Bytes, Error>; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Self::Item>> { - self.0.poll_recv(cx) - } -} - -pub struct FetchRequestBodyResource { - pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>, - pub cancel: CancelHandle, -} - -impl Resource for FetchRequestBodyResource { - fn name(&self) -> Cow<str> { - "fetchRequestBody".into() - } - - fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> { - Box::pin(async move { - let bytes: bytes::Bytes = buf.into(); - let nwritten = bytes.len(); - let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - let body = (*body).as_ref(); - let cancel = RcRef::map(self, |r| &r.cancel); - let body = body.ok_or(type_error( - "request body receiver not connected (request closed)", - ))?; - body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; - Ok(WriteOutcome::Full { nwritten }) - }) - } - - fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> { - async move { - let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - let body = (*body).as_ref(); - let cancel = RcRef::map(self, |r| &r.cancel); - let body = body.ok_or(type_error( - "request body receiver not connected (request closed)", - ))?; - body.send(Err(error)).or_cancel(cancel).await??; - Ok(()) - } - .boxed_local() - } - - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { - async move { - let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - body.take(); - Ok(()) - } - .boxed_local() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel(); - } -} - type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; |