summaryrefslogtreecommitdiff
path: root/ext/fetch/lib.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-01 08:56:10 -0700
committerGitHub <noreply@github.com>2023-12-01 08:56:10 -0700
commite6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch)
tree57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/fetch/lib.rs
parent687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff)
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r--ext/fetch/lib.rs212
1 files changed, 104 insertions, 108 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 7cde5584f..6e1ecb5e4 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -11,6 +11,8 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
use deno_core::anyhow::Error;
use deno_core::error::type_error;
@@ -21,13 +23,11 @@ use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::op2;
-use deno_core::BufView;
-use deno_core::WriteOutcome;
-
use deno_core::unsync::spawn;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -62,7 +62,6 @@ use serde::Deserialize;
use serde::Serialize;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
-use tokio::sync::mpsc;
// Re-export reqwest and data_url
pub use data_url;
@@ -184,7 +183,6 @@ pub fn get_declaration() -> PathBuf {
#[serde(rename_all = "camelCase")]
pub struct FetchReturn {
pub request_rid: ResourceId,
- pub request_body_rid: Option<ResourceId>,
pub cancel_handle_rid: Option<ResourceId>,
}
@@ -216,6 +214,59 @@ pub fn get_or_create_client_from_state(
}
}
+#[allow(clippy::type_complexity)]
+pub struct ResourceToBodyAdapter(
+ Rc<dyn Resource>,
+ Option<Pin<Box<dyn Future<Output = Result<BufView, Error>>>>>,
+);
+
+impl ResourceToBodyAdapter {
+ pub fn new(resource: Rc<dyn Resource>) -> Self {
+ let future = resource.clone().read(64 * 1024);
+ Self(resource, Some(future))
+ }
+}
+
+// SAFETY: we only use this on a single-threaded executor
+unsafe impl Send for ResourceToBodyAdapter {}
+// SAFETY: we only use this on a single-threaded executor
+unsafe impl Sync for ResourceToBodyAdapter {}
+
+impl Stream for ResourceToBodyAdapter {
+ type Item = Result<BufView, Error>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+ if let Some(mut fut) = this.1.take() {
+ match fut.poll_unpin(cx) {
+ Poll::Pending => {
+ this.1 = Some(fut);
+ Poll::Pending
+ }
+ Poll::Ready(res) => match res {
+ Ok(buf) if buf.is_empty() => Poll::Ready(None),
+ Ok(_) => {
+ this.1 = Some(this.0.clone().read(64 * 1024));
+ Poll::Ready(Some(res))
+ }
+ _ => Poll::Ready(Some(res)),
+ },
+ }
+ } else {
+ Poll::Ready(None)
+ }
+ }
+}
+
+impl Drop for ResourceToBodyAdapter {
+ fn drop(&mut self) {
+ self.0.clone().close()
+ }
+}
+
#[op2]
#[serde]
#[allow(clippy::too_many_arguments)]
@@ -226,8 +277,8 @@ pub fn op_fetch<FP>(
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
has_body: bool,
- #[number] body_length: Option<u64>,
#[buffer] data: Option<JsBuffer>,
+ #[smi] resource: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
where
FP: FetchPermissions + 'static,
@@ -244,7 +295,7 @@ where
// Check scheme before asking for net permission
let scheme = url.scheme();
- let (request_rid, request_body_rid, cancel_handle_rid) = match scheme {
+ let (request_rid, cancel_handle_rid) = match scheme {
"file" => {
let path = url.to_file_path().map_err(|_| {
type_error("NetworkError when attempting to fetch resource.")
@@ -268,7 +319,7 @@ where
let maybe_cancel_handle_rid = maybe_cancel_handle
.map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
- (request_rid, None, maybe_cancel_handle_rid)
+ (request_rid, maybe_cancel_handle_rid)
}
"http" | "https" => {
let permissions = state.borrow_mut::<FP>();
@@ -282,34 +333,25 @@ where
let mut request = client.request(method.clone(), url);
- let request_body_rid = if has_body {
- match data {
- None => {
- // If no body is passed, we return a writer for streaming the body.
- let (tx, stream) = tokio::sync::mpsc::channel(1);
-
- // If the size of the body is known, we include a content-length
- // header explicitly.
- if let Some(body_size) = body_length {
- request =
- request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
- }
-
- request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
-
- let request_body_rid =
- state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(Some(tx)),
- cancel: CancelHandle::default(),
- });
-
- Some(request_body_rid)
- }
- Some(data) => {
+ if has_body {
+ match (data, resource) {
+ (Some(data), _) => {
// If a body is passed, we use it, and don't return a body for streaming.
request = request.body(data.to_vec());
- None
}
+ (_, Some(resource)) => {
+ let resource = state.resource_table.take_any(resource)?;
+ match resource.size_hint() {
+ (body_size, Some(n)) if body_size == n && body_size > 0 => {
+ request =
+ request.header(CONTENT_LENGTH, HeaderValue::from(body_size));
+ }
+ _ => {}
+ }
+ request = request
+ .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource)))
+ }
+ (None, None) => unreachable!(),
}
} else {
// POST and PUT requests should always have a 0 length content-length,
@@ -317,7 +359,6 @@ where
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
- None
};
let mut header_map = HeaderMap::new();
@@ -354,7 +395,7 @@ where
.send()
.or_cancel(cancel_handle_)
.await
- .map(|res| res.map_err(|err| type_error(err.to_string())))
+ .map(|res| res.map_err(|err| err.into()))
};
let request_rid = state
@@ -364,7 +405,7 @@ where
let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));
- (request_rid, request_body_rid, Some(cancel_handle_rid))
+ (request_rid, Some(cancel_handle_rid))
}
"data" => {
let data_url = DataUrl::process(url.as_str())
@@ -385,7 +426,7 @@ where
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
- (request_rid, None, None)
+ (request_rid, None)
}
"blob" => {
// Blob URL resolution happens in the JS side of fetch. If we got here is
@@ -397,12 +438,11 @@ where
Ok(FetchReturn {
request_rid,
- request_body_rid,
cancel_handle_rid,
})
}
-#[derive(Serialize)]
+#[derive(Default, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FetchResponse {
pub status: u16,
@@ -413,6 +453,7 @@ pub struct FetchResponse {
pub content_length: Option<u64>,
pub remote_addr_ip: Option<String>,
pub remote_addr_port: Option<u16>,
+ pub error: Option<String>,
}
#[op2(async)]
@@ -432,7 +473,29 @@ pub async fn op_fetch_send(
let res = match request.0.await {
Ok(Ok(res)) => res,
- Ok(Err(err)) => return Err(type_error(err.to_string())),
+ Ok(Err(err)) => {
+ // We're going to try and rescue the error cause from a stream and return it from this fetch.
+ // If any error in the chain is a reqwest body error, return that as a special result we can use to
+ // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
+ // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
+ let mut err_ref: &dyn std::error::Error = err.as_ref();
+ while let Some(err) = std::error::Error::source(err_ref) {
+ if let Some(err) = err.downcast_ref::<reqwest::Error>() {
+ if err.is_body() {
+ // Extracts the next error cause and uses that for the message
+ if let Some(err) = std::error::Error::source(err) {
+ return Ok(FetchResponse {
+ error: Some(err.to_string()),
+ ..Default::default()
+ });
+ }
+ }
+ }
+ err_ref = err;
+ }
+
+ return Err(type_error(err.to_string()));
+ }
Err(_) => return Err(type_error("request was cancelled")),
};
@@ -465,6 +528,7 @@ pub async fn op_fetch_send(
content_length,
remote_addr_ip,
remote_addr_port,
+ error: None,
})
}
@@ -599,74 +663,6 @@ impl Resource for FetchCancelHandle {
}
}
-/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
-pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
-
-impl Stream for FetchBodyStream {
- type Item = Result<bytes::Bytes, Error>;
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- self.0.poll_recv(cx)
- }
-}
-
-pub struct FetchRequestBodyResource {
- pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>,
- pub cancel: CancelHandle,
-}
-
-impl Resource for FetchRequestBodyResource {
- fn name(&self) -> Cow<str> {
- "fetchRequestBody".into()
- }
-
- fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
- Box::pin(async move {
- let bytes: bytes::Bytes = buf.into();
- let nwritten = bytes.len();
- let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
- let body = (*body).as_ref();
- let cancel = RcRef::map(self, |r| &r.cancel);
- let body = body.ok_or(type_error(
- "request body receiver not connected (request closed)",
- ))?;
- body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
- type_error("request body receiver not connected (request closed)")
- })?;
- Ok(WriteOutcome::Full { nwritten })
- })
- }
-
- fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> {
- async move {
- let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
- let body = (*body).as_ref();
- let cancel = RcRef::map(self, |r| &r.cancel);
- let body = body.ok_or(type_error(
- "request body receiver not connected (request closed)",
- ))?;
- body.send(Err(error)).or_cancel(cancel).await??;
- Ok(())
- }
- .boxed_local()
- }
-
- fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
- async move {
- let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
- body.take();
- Ok(())
- }
- .boxed_local()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel();
- }
-}
-
type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;