diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/fetch/lib.rs | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r-- | ext/fetch/lib.rs | 212 |
1 files changed, 104 insertions, 108 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 7cde5584f..6e1ecb5e4 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -11,6 +11,8 @@ use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use deno_core::anyhow::Error; use deno_core::error::type_error; @@ -21,13 +23,11 @@ use deno_core::futures::FutureExt; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::op2; -use deno_core::BufView; -use deno_core::WriteOutcome; - use deno_core::unsync::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -62,7 +62,6 @@ use serde::Deserialize; use serde::Serialize; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::sync::mpsc; // Re-export reqwest and data_url pub use data_url; @@ -184,7 +183,6 @@ pub fn get_declaration() -> PathBuf { #[serde(rename_all = "camelCase")] pub struct FetchReturn { pub request_rid: ResourceId, - pub request_body_rid: Option<ResourceId>, pub cancel_handle_rid: Option<ResourceId>, } @@ -216,6 +214,59 @@ pub fn get_or_create_client_from_state( } } +#[allow(clippy::type_complexity)] +pub struct ResourceToBodyAdapter( + Rc<dyn Resource>, + Option<Pin<Box<dyn Future<Output = Result<BufView, Error>>>>>, +); + +impl ResourceToBodyAdapter { + pub fn new(resource: Rc<dyn Resource>) -> Self { + let future = resource.clone().read(64 * 1024); + Self(resource, Some(future)) + } +} + +// SAFETY: we only use this on a single-threaded executor +unsafe impl Send for ResourceToBodyAdapter {} +// SAFETY: we only use this on a single-threaded executor +unsafe impl Sync for ResourceToBodyAdapter {} + +impl Stream for ResourceToBodyAdapter { + type Item = Result<BufView, Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = self.get_mut(); + if let Some(mut fut) = this.1.take() { + match fut.poll_unpin(cx) { + Poll::Pending => { + this.1 = Some(fut); + Poll::Pending + } + Poll::Ready(res) => match res { + Ok(buf) if buf.is_empty() => Poll::Ready(None), + Ok(_) => { + this.1 = Some(this.0.clone().read(64 * 1024)); + Poll::Ready(Some(res)) + } + _ => Poll::Ready(Some(res)), + }, + } + } else { + Poll::Ready(None) + } + } +} + +impl Drop for ResourceToBodyAdapter { + fn drop(&mut self) { + self.0.clone().close() + } +} + #[op2] #[serde] #[allow(clippy::too_many_arguments)] @@ -226,8 +277,8 @@ pub fn op_fetch<FP>( #[serde] headers: Vec<(ByteString, ByteString)>, #[smi] client_rid: Option<u32>, has_body: bool, - #[number] body_length: Option<u64>, #[buffer] data: Option<JsBuffer>, + #[smi] resource: Option<ResourceId>, ) -> Result<FetchReturn, AnyError> where FP: FetchPermissions + 'static, @@ -244,7 +295,7 @@ where // Check scheme before asking for net permission let scheme = url.scheme(); - let (request_rid, request_body_rid, cancel_handle_rid) = match scheme { + let (request_rid, cancel_handle_rid) = match scheme { "file" => { let path = url.to_file_path().map_err(|_| { type_error("NetworkError when attempting to fetch resource.") @@ -268,7 +319,7 @@ where let maybe_cancel_handle_rid = maybe_cancel_handle .map(|ch| state.resource_table.add(FetchCancelHandle(ch))); - (request_rid, None, maybe_cancel_handle_rid) + (request_rid, maybe_cancel_handle_rid) } "http" | "https" => { let permissions = state.borrow_mut::<FP>(); @@ -282,34 +333,25 @@ where let mut request = client.request(method.clone(), url); - let request_body_rid = if has_body { - match data { - None => { - // If no body is passed, we return a writer for streaming the body. - let (tx, stream) = tokio::sync::mpsc::channel(1); - - // If the size of the body is known, we include a content-length - // header explicitly. - if let Some(body_size) = body_length { - request = - request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) - } - - request = request.body(Body::wrap_stream(FetchBodyStream(stream))); - - let request_body_rid = - state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(Some(tx)), - cancel: CancelHandle::default(), - }); - - Some(request_body_rid) - } - Some(data) => { + if has_body { + match (data, resource) { + (Some(data), _) => { // If a body is passed, we use it, and don't return a body for streaming. request = request.body(data.to_vec()); - None } + (_, Some(resource)) => { + let resource = state.resource_table.take_any(resource)?; + match resource.size_hint() { + (body_size, Some(n)) if body_size == n && body_size > 0 => { + request = + request.header(CONTENT_LENGTH, HeaderValue::from(body_size)); + } + _ => {} + } + request = request + .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource))) + } + (None, None) => unreachable!(), } } else { // POST and PUT requests should always have a 0 length content-length, @@ -317,7 +359,6 @@ where if matches!(method, Method::POST | Method::PUT) { request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); } - None }; let mut header_map = HeaderMap::new(); @@ -354,7 +395,7 @@ where .send() .or_cancel(cancel_handle_) .await - .map(|res| res.map_err(|err| type_error(err.to_string()))) + .map(|res| res.map_err(|err| err.into())) }; let request_rid = state @@ -364,7 +405,7 @@ where let cancel_handle_rid = state.resource_table.add(FetchCancelHandle(cancel_handle)); - (request_rid, request_body_rid, Some(cancel_handle_rid)) + (request_rid, Some(cancel_handle_rid)) } "data" => { let data_url = DataUrl::process(url.as_str()) @@ -385,7 +426,7 @@ where .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None, None) + (request_rid, None) } "blob" => { // Blob URL resolution happens in the JS side of fetch. If we got here is @@ -397,12 +438,11 @@ where Ok(FetchReturn { request_rid, - request_body_rid, cancel_handle_rid, }) } -#[derive(Serialize)] +#[derive(Default, Serialize)] #[serde(rename_all = "camelCase")] pub struct FetchResponse { pub status: u16, @@ -413,6 +453,7 @@ pub struct FetchResponse { pub content_length: Option<u64>, pub remote_addr_ip: Option<String>, pub remote_addr_port: Option<u16>, + pub error: Option<String>, } #[op2(async)] @@ -432,7 +473,29 @@ pub async fn op_fetch_send( let res = match request.0.await { Ok(Ok(res)) => res, - Ok(Err(err)) => return Err(type_error(err.to_string())), + Ok(Err(err)) => { + // We're going to try and rescue the error cause from a stream and return it from this fetch. + // If any error in the chain is a reqwest body error, return that as a special result we can use to + // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`). + // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead + let mut err_ref: &dyn std::error::Error = err.as_ref(); + while let Some(err) = std::error::Error::source(err_ref) { + if let Some(err) = err.downcast_ref::<reqwest::Error>() { + if err.is_body() { + // Extracts the next error cause and uses that for the message + if let Some(err) = std::error::Error::source(err) { + return Ok(FetchResponse { + error: Some(err.to_string()), + ..Default::default() + }); + } + } + } + err_ref = err; + } + + return Err(type_error(err.to_string())); + } Err(_) => return Err(type_error("request was cancelled")), }; @@ -465,6 +528,7 @@ pub async fn op_fetch_send( content_length, remote_addr_ip, remote_addr_port, + error: None, }) } @@ -599,74 +663,6 @@ impl Resource for FetchCancelHandle { } } -/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`]. -pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>); - -impl Stream for FetchBodyStream { - type Item = Result<bytes::Bytes, Error>; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Self::Item>> { - self.0.poll_recv(cx) - } -} - -pub struct FetchRequestBodyResource { - pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>, - pub cancel: CancelHandle, -} - -impl Resource for FetchRequestBodyResource { - fn name(&self) -> Cow<str> { - "fetchRequestBody".into() - } - - fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> { - Box::pin(async move { - let bytes: bytes::Bytes = buf.into(); - let nwritten = bytes.len(); - let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - let body = (*body).as_ref(); - let cancel = RcRef::map(self, |r| &r.cancel); - let body = body.ok_or(type_error( - "request body receiver not connected (request closed)", - ))?; - body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; - Ok(WriteOutcome::Full { nwritten }) - }) - } - - fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> { - async move { - let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - let body = (*body).as_ref(); - let cancel = RcRef::map(self, |r| &r.cancel); - let body = body.ok_or(type_error( - "request body receiver not connected (request closed)", - ))?; - body.send(Err(error)).or_cancel(cancel).await??; - Ok(()) - } - .boxed_local() - } - - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { - async move { - let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - body.take(); - Ok(()) - } - .boxed_local() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel(); - } -} - type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; |