summaryrefslogtreecommitdiff
path: root/ext/fetch/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r--ext/fetch/lib.rs311
1 files changed, 201 insertions, 110 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 75ceb86d9..1343a9f56 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
mod fs_fetch_handler;
+mod proxy;
use std::borrow::Cow;
use std::cell::RefCell;
@@ -14,7 +15,7 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
-use bytes::Bytes;
+use deno_core::anyhow::anyhow;
use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
@@ -42,34 +43,38 @@ use deno_core::ResourceId;
use deno_tls::rustls::RootCertStore;
use deno_tls::Proxy;
use deno_tls::RootCertStoreProvider;
-
-use data_url::DataUrl;
use deno_tls::TlsKey;
use deno_tls::TlsKeys;
use deno_tls::TlsKeysHolder;
+
+use bytes::Bytes;
+use data_url::DataUrl;
+use http::header::HeaderName;
+use http::header::HeaderValue;
+use http::header::ACCEPT_ENCODING;
use http::header::CONTENT_LENGTH;
+use http::header::HOST;
+use http::header::PROXY_AUTHORIZATION;
+use http::header::RANGE;
+use http::header::USER_AGENT;
+use http::Method;
use http::Uri;
-use reqwest::header::HeaderMap;
-use reqwest::header::HeaderName;
-use reqwest::header::HeaderValue;
-use reqwest::header::ACCEPT_ENCODING;
-use reqwest::header::HOST;
-use reqwest::header::RANGE;
-use reqwest::header::USER_AGENT;
-use reqwest::redirect::Policy;
-use reqwest::Body;
-use reqwest::Client;
-use reqwest::Method;
-use reqwest::RequestBuilder;
-use reqwest::Response;
+use http_body_util::BodyExt;
+use hyper::body::Frame;
+use hyper_rustls::HttpsConnector;
+use hyper_util::client::legacy::connect::HttpConnector;
+use hyper_util::rt::TokioExecutor;
+use hyper_util::rt::TokioIo;
+use hyper_util::rt::TokioTimer;
use serde::Deserialize;
use serde::Serialize;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
+use tower::ServiceExt;
+use tower_http::decompression::Decompression;
-// Re-export reqwest and data_url
+// Re-export data_url
pub use data_url;
-pub use reqwest;
pub use fs_fetch_handler::FsFetchHandler;
@@ -78,8 +83,9 @@ pub struct Options {
pub user_agent: String,
pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
pub proxy: Option<Proxy>,
+ #[allow(clippy::type_complexity)]
pub request_builder_hook:
- Option<fn(RequestBuilder) -> Result<RequestBuilder, AnyError>>,
+ Option<fn(&mut http::Request<ReqBody>) -> Result<(), AnyError>>,
pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
pub client_cert_chain_and_key: TlsKeys,
pub file_fetch_handler: Rc<dyn FetchHandler>,
@@ -146,7 +152,7 @@ pub trait FetchHandler: dyn_clone::DynClone {
fn fetch_file(
&self,
state: &mut OpState,
- url: Url,
+ url: &Url,
) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>);
}
@@ -160,7 +166,7 @@ impl FetchHandler for DefaultFileFetchHandler {
fn fetch_file(
&self,
_state: &mut OpState,
- _url: Url,
+ _url: &Url,
) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) {
let fut = async move {
Ok(Err(type_error(
@@ -183,20 +189,20 @@ pub struct FetchReturn {
pub fn get_or_create_client_from_state(
state: &mut OpState,
-) -> Result<reqwest::Client, AnyError> {
- if let Some(client) = state.try_borrow::<reqwest::Client>() {
+) -> Result<Client, AnyError> {
+ if let Some(client) = state.try_borrow::<Client>() {
Ok(client.clone())
} else {
let options = state.borrow::<Options>();
let client = create_client_from_options(options)?;
- state.put::<reqwest::Client>(client.clone());
+ state.put::<Client>(client.clone());
Ok(client)
}
}
pub fn create_client_from_options(
options: &Options,
-) -> Result<reqwest::Client, AnyError> {
+) -> Result<Client, AnyError> {
create_http_client(
&options.user_agent,
CreateHttpClientOptions {
@@ -253,11 +259,11 @@ impl Stream for ResourceToBodyAdapter {
}
Poll::Ready(res) => match res {
Ok(buf) if buf.is_empty() => Poll::Ready(None),
- Ok(_) => {
+ Ok(buf) => {
this.1 = Some(this.0.clone().read(64 * 1024));
- Poll::Ready(Some(res.map(|b| b.to_vec().into())))
+ Poll::Ready(Some(Ok(buf.to_vec().into())))
}
- _ => Poll::Ready(Some(res.map(|b| b.to_vec().into()))),
+ Err(err) => Poll::Ready(Some(Err(err))),
},
}
} else {
@@ -266,6 +272,22 @@ impl Stream for ResourceToBodyAdapter {
}
}
+impl hyper::body::Body for ResourceToBodyAdapter {
+ type Data = Bytes;
+ type Error = Error;
+
+ fn poll_frame(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ match self.poll_next(cx) {
+ Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
+ Poll::Ready(None) => Poll::Ready(None),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
impl Drop for ResourceToBodyAdapter {
fn drop(&mut self) {
self.0.clone().close()
@@ -347,9 +369,11 @@ where
file_fetch_handler, ..
} = state.borrow_mut::<Options>();
let file_fetch_handler = file_fetch_handler.clone();
- let (request, maybe_cancel_handle) =
- file_fetch_handler.fetch_file(state, url);
- let request_rid = state.resource_table.add(FetchRequestResource(request));
+ let (future, maybe_cancel_handle) =
+ file_fetch_handler.fetch_file(state, &url);
+ let request_rid = state
+ .resource_table
+ .add(FetchRequestResource { future, url });
let maybe_cancel_handle_rid = maybe_cancel_handle
.map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
@@ -359,31 +383,31 @@ where
let permissions = state.borrow_mut::<FP>();
permissions.check_net_url(&url, "fetch()")?;
- // Make sure that we have a valid URI early, as reqwest's `RequestBuilder::send`
- // internally uses `expect_uri`, which panics instead of returning a usable `Result`.
- if url.as_str().parse::<Uri>().is_err() {
- return Err(type_error("Invalid URL"));
- }
+ let uri = url
+ .as_str()
+ .parse::<Uri>()
+ .map_err(|_| type_error("Invalid URL"))?;
- let mut request = client.request(method.clone(), url);
-
- if has_body {
+ let mut con_len = None;
+ let body = if has_body {
match (data, resource) {
(Some(data), _) => {
// If a body is passed, we use it, and don't return a body for streaming.
- request = request.body(data.to_vec());
+ con_len = Some(data.len() as u64);
+
+ http_body_util::Full::new(data.to_vec().into())
+ .map_err(|never| match never {})
+ .boxed()
}
(_, Some(resource)) => {
let resource = state.resource_table.take_any(resource)?;
match resource.size_hint() {
(body_size, Some(n)) if body_size == n && body_size > 0 => {
- request =
- request.header(CONTENT_LENGTH, HeaderValue::from(body_size));
+ con_len = Some(body_size);
}
_ => {}
}
- request = request
- .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource)))
+ ReqBody::new(ResourceToBodyAdapter::new(resource))
}
(None, None) => unreachable!(),
}
@@ -391,11 +415,21 @@ where
// POST and PUT requests should always have a 0 length content-length,
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
if matches!(method, Method::POST | Method::PUT) {
- request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
+ con_len = Some(0);
}
+ http_body_util::Empty::new()
+ .map_err(|never| match never {})
+ .boxed()
};
- let mut header_map = HeaderMap::new();
+ let mut request = http::Request::new(body);
+ *request.method_mut() = method.clone();
+ *request.uri_mut() = uri;
+
+ if let Some(len) = con_len {
+ request.headers_mut().insert(CONTENT_LENGTH, len.into());
+ }
+
for (key, value) in headers {
let name = HeaderName::from_bytes(&key)
.map_err(|err| type_error(err.to_string()))?;
@@ -403,38 +437,34 @@ where
.map_err(|err| type_error(err.to_string()))?;
if (name != HOST || allow_host) && name != CONTENT_LENGTH {
- header_map.append(name, v);
+ request.headers_mut().append(name, v);
}
}
- if header_map.contains_key(RANGE) {
+ if request.headers().contains_key(RANGE) {
// https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18
// If httpRequest’s header list contains `Range`, then append (`Accept-Encoding`, `identity`)
- header_map
+ request
+ .headers_mut()
.insert(ACCEPT_ENCODING, HeaderValue::from_static("identity"));
}
- request = request.headers(header_map);
let options = state.borrow::<Options>();
if let Some(request_builder_hook) = options.request_builder_hook {
- request = request_builder_hook(request)
+ request_builder_hook(&mut request)
.map_err(|err| type_error(err.to_string()))?;
}
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();
- let fut = async move {
- request
- .send()
- .or_cancel(cancel_handle_)
- .await
- .map(|res| res.map_err(|err| err.into()))
- };
+ let fut =
+ async move { client.send(request).or_cancel(cancel_handle_).await };
- let request_rid = state
- .resource_table
- .add(FetchRequestResource(Box::pin(fut)));
+ let request_rid = state.resource_table.add(FetchRequestResource {
+ future: Box::pin(fut),
+ url,
+ });
let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));
@@ -448,17 +478,21 @@ where
let (body, _) = data_url
.decode_to_vec()
.map_err(|e| type_error(format!("{e:?}")))?;
+ let body = http_body_util::Full::new(body.into())
+ .map_err(|never| match never {})
+ .boxed();
let response = http::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
- .body(reqwest::Body::from(body))?;
+ .body(body)?;
- let fut = async move { Ok(Ok(Response::from(response))) };
+ let fut = async move { Ok(Ok(response)) };
- let request_rid = state
- .resource_table
- .add(FetchRequestResource(Box::pin(fut)));
+ let request_rid = state.resource_table.add(FetchRequestResource {
+ future: Box::pin(fut),
+ url,
+ });
(request_rid, None)
}
@@ -505,24 +539,21 @@ pub async fn op_fetch_send(
.ok()
.expect("multiple op_fetch_send ongoing");
- let res = match request.0.await {
+ let res = match request.future.await {
Ok(Ok(res)) => res,
Ok(Err(err)) => {
// We're going to try and rescue the error cause from a stream and return it from this fetch.
- // If any error in the chain is a reqwest body error, return that as a special result we can use to
+ // If any error in the chain is a hyper body error, return that as a special result we can use to
// reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
// TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
let mut err_ref: &dyn std::error::Error = err.as_ref();
while let Some(err) = std::error::Error::source(err_ref) {
- if let Some(err) = err.downcast_ref::<reqwest::Error>() {
- if err.is_body() {
- // Extracts the next error cause and uses that for the message
- if let Some(err) = std::error::Error::source(err) {
- return Ok(FetchResponse {
- error: Some(err.to_string()),
- ..Default::default()
- });
- }
+ if let Some(err) = err.downcast_ref::<hyper::Error>() {
+ if let Some(err) = std::error::Error::source(err) {
+ return Ok(FetchResponse {
+ error: Some(err.to_string()),
+ ..Default::default()
+ });
}
}
err_ref = err;
@@ -534,14 +565,17 @@ pub async fn op_fetch_send(
};
let status = res.status();
- let url = res.url().to_string();
+ let url = request.url.into();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.as_str().into(), val.as_bytes().into()));
}
- let content_length = res.content_length();
- let remote_addr = res.remote_addr();
+ let content_length = hyper::body::Body::size_hint(res.body()).exact();
+ let remote_addr = res
+ .extensions()
+ .get::<hyper_util::client::legacy::connect::HttpInfo>()
+ .map(|info| info.remote_addr());
let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr {
(Some(addr.ip().to_string()), Some(addr.port()))
} else {
@@ -585,7 +619,8 @@ pub async fn op_fetch_response_upgrade(
let upgraded = raw_response.upgrade().await?;
{
// Stage 3: Pump the data
- let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
+ let (mut upgraded_rx, mut upgraded_tx) =
+ tokio::io::split(TokioIo::new(upgraded));
spawn(async move {
let mut buf = [0; 1024];
@@ -673,11 +708,13 @@ impl Resource for UpgradeStream {
}
}
-type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;
+type CancelableResponseResult =
+ Result<Result<http::Response<ResBody>, AnyError>, Canceled>;
-pub struct FetchRequestResource(
- pub Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
-);
+pub struct FetchRequestResource {
+ pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
+ pub url: Url,
+}
impl Resource for FetchRequestResource {
fn name(&self) -> Cow<str> {
@@ -701,7 +738,7 @@ type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
pub enum FetchResponseReader {
- Start(Response),
+ Start(http::Response<ResBody>),
BodyReader(Peekable<BytesStream>),
}
@@ -719,7 +756,7 @@ pub struct FetchResponseResource {
}
impl FetchResponseResource {
- pub fn new(response: Response, size: Option<u64>) -> Self {
+ pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
Self {
response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
cancel: CancelHandle::default(),
@@ -727,10 +764,10 @@ impl FetchResponseResource {
}
}
- pub async fn upgrade(self) -> Result<reqwest::Upgraded, AnyError> {
+ pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, AnyError> {
let reader = self.response_reader.into_inner();
match reader {
- FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?),
+ FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?),
_ => unreachable!(),
}
}
@@ -754,11 +791,12 @@ impl Resource for FetchResponseResource {
match std::mem::take(&mut *reader) {
FetchResponseReader::Start(resp) => {
- let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| {
- r.map_err(|err| {
- std::io::Error::new(std::io::ErrorKind::Other, err)
- })
- }));
+ let stream: BytesStream =
+ Box::pin(resp.into_body().into_data_stream().map(|r| {
+ r.map_err(|err| {
+ std::io::Error::new(std::io::ErrorKind::Other, err)
+ })
+ }));
*reader = FetchResponseReader::BodyReader(stream.peekable());
}
FetchResponseReader::BodyReader(_) => unreachable!(),
@@ -922,7 +960,7 @@ impl Default for CreateHttpClientOptions {
}
}
-/// Create new instance of async reqwest::Client. This client supports
+/// Create new instance of async Client. This client supports
/// proxies and doesn't follow redirects.
pub fn create_http_client(
user_agent: &str,
@@ -944,43 +982,64 @@ pub fn create_http_client(
alpn_protocols.push("http/1.1".into());
}
tls_config.alpn_protocols = alpn_protocols;
+ let tls_config = Arc::from(tls_config);
- let mut headers = HeaderMap::new();
- headers.insert(USER_AGENT, user_agent.parse().unwrap());
- let mut builder = Client::builder()
- .redirect(Policy::none())
- .default_headers(headers)
- .use_preconfigured_tls(tls_config);
+ let mut http_connector = HttpConnector::new();
+ http_connector.enforce_http(false);
+ let connector = HttpsConnector::from((http_connector, tls_config.clone()));
+ let user_agent = user_agent
+ .parse::<HeaderValue>()
+ .map_err(|_| type_error("illegal characters in User-Agent"))?;
+
+ let mut builder =
+ hyper_util::client::legacy::Builder::new(TokioExecutor::new());
+ builder.timer(TokioTimer::new());
+ builder.pool_timer(TokioTimer::new());
+
+ let mut proxies = proxy::from_env();
if let Some(proxy) = options.proxy {
- let mut reqwest_proxy = reqwest::Proxy::all(&proxy.url)?;
+ let mut intercept = proxy::Intercept::all(&proxy.url)
+ .ok_or_else(|| type_error("invalid proxy url"))?;
if let Some(basic_auth) = &proxy.basic_auth {
- reqwest_proxy =
- reqwest_proxy.basic_auth(&basic_auth.username, &basic_auth.password);
+ intercept.set_auth(&basic_auth.username, &basic_auth.password);
}
- builder = builder.proxy(reqwest_proxy);
+ proxies.prepend(intercept);
}
+ let proxies = Arc::new(proxies);
+ let mut connector =
+ proxy::ProxyConnector::new(proxies.clone(), connector, tls_config);
+ connector.user_agent(user_agent.clone());
if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host {
- builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);
+ builder.pool_max_idle_per_host(pool_max_idle_per_host);
}
if let Some(pool_idle_timeout) = options.pool_idle_timeout {
- builder = builder.pool_idle_timeout(
+ builder.pool_idle_timeout(
pool_idle_timeout.map(std::time::Duration::from_millis),
);
}
match (options.http1, options.http2) {
- (true, false) => builder = builder.http1_only(),
- (false, true) => builder = builder.http2_prior_knowledge(),
+ (true, false) => {} // noop, handled by ALPN above
+ (false, true) => {
+ builder.http2_only(true);
+ }
(true, true) => {}
(false, false) => {
return Err(type_error("Either `http1` or `http2` needs to be true"))
}
}
- builder.build().map_err(|e| e.into())
+ let pooled_client = builder.build(connector);
+ let decompress = Decompression::new(pooled_client).gzip(true).br(true);
+
+ Ok(Client {
+ inner: decompress,
+ proxies,
+ user_agent,
+ })
}
#[op2]
@@ -990,3 +1049,35 @@ pub fn op_utf8_to_byte_string(
) -> Result<ByteString, AnyError> {
Ok(input.into())
}
+
+#[derive(Clone, Debug)]
+pub struct Client {
+ inner: Decompression<hyper_util::client::legacy::Client<Connector, ReqBody>>,
+ // Used to check whether to include a proxy-authorization header
+ proxies: Arc<proxy::Proxies>,
+ user_agent: HeaderValue,
+}
+
+type Connector = proxy::ProxyConnector<HttpsConnector<HttpConnector>>;
+
+impl Client {
+ pub async fn send(
+ self,
+ mut req: http::Request<ReqBody>,
+ ) -> Result<http::Response<ResBody>, AnyError> {
+ req
+ .headers_mut()
+ .entry(USER_AGENT)
+ .or_insert_with(|| self.user_agent.clone());
+
+ if let Some(auth) = self.proxies.http_forward_auth(req.uri()) {
+ req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone());
+ }
+
+ let resp = self.inner.oneshot(req).await?;
+ Ok(resp.map(|b| b.map_err(|e| anyhow!(e)).boxed()))
+ }
+}
+
+pub type ReqBody = http_body_util::combinators::BoxBody<Bytes, Error>;
+pub type ResBody = http_body_util::combinators::BoxBody<Bytes, Error>;