diff options
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r-- | ext/fetch/lib.rs | 311 |
1 files changed, 110 insertions, 201 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 1343a9f56..75ceb86d9 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,7 +1,6 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. mod fs_fetch_handler; -mod proxy; use std::borrow::Cow; use std::cell::RefCell; @@ -15,7 +14,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -use deno_core::anyhow::anyhow; +use bytes::Bytes; use deno_core::anyhow::Error; use deno_core::error::type_error; use deno_core::error::AnyError; @@ -43,38 +42,34 @@ use deno_core::ResourceId; use deno_tls::rustls::RootCertStore; use deno_tls::Proxy; use deno_tls::RootCertStoreProvider; + +use data_url::DataUrl; use deno_tls::TlsKey; use deno_tls::TlsKeys; use deno_tls::TlsKeysHolder; - -use bytes::Bytes; -use data_url::DataUrl; -use http::header::HeaderName; -use http::header::HeaderValue; -use http::header::ACCEPT_ENCODING; use http::header::CONTENT_LENGTH; -use http::header::HOST; -use http::header::PROXY_AUTHORIZATION; -use http::header::RANGE; -use http::header::USER_AGENT; -use http::Method; use http::Uri; -use http_body_util::BodyExt; -use hyper::body::Frame; -use hyper_rustls::HttpsConnector; -use hyper_util::client::legacy::connect::HttpConnector; -use hyper_util::rt::TokioExecutor; -use hyper_util::rt::TokioIo; -use hyper_util::rt::TokioTimer; +use reqwest::header::HeaderMap; +use reqwest::header::HeaderName; +use reqwest::header::HeaderValue; +use reqwest::header::ACCEPT_ENCODING; +use reqwest::header::HOST; +use reqwest::header::RANGE; +use reqwest::header::USER_AGENT; +use reqwest::redirect::Policy; +use reqwest::Body; +use reqwest::Client; +use reqwest::Method; +use reqwest::RequestBuilder; +use reqwest::Response; use serde::Deserialize; use serde::Serialize; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tower::ServiceExt; -use tower_http::decompression::Decompression; -// Re-export data_url +// Re-export reqwest and data_url pub use data_url; +pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; @@ -83,9 +78,8 @@ pub struct Options { pub user_agent: String, pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>, pub proxy: Option<Proxy>, - #[allow(clippy::type_complexity)] pub request_builder_hook: - Option<fn(&mut http::Request<ReqBody>) -> Result<(), AnyError>>, + Option<fn(RequestBuilder) -> Result<RequestBuilder, AnyError>>, pub unsafely_ignore_certificate_errors: Option<Vec<String>>, pub client_cert_chain_and_key: TlsKeys, pub file_fetch_handler: Rc<dyn FetchHandler>, @@ -152,7 +146,7 @@ pub trait FetchHandler: dyn_clone::DynClone { fn fetch_file( &self, state: &mut OpState, - url: &Url, + url: Url, ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>); } @@ -166,7 +160,7 @@ impl FetchHandler for DefaultFileFetchHandler { fn fetch_file( &self, _state: &mut OpState, - _url: &Url, + _url: Url, ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) { let fut = async move { Ok(Err(type_error( @@ -189,20 +183,20 @@ pub struct FetchReturn { pub fn get_or_create_client_from_state( state: &mut OpState, -) -> Result<Client, AnyError> { - if let Some(client) = state.try_borrow::<Client>() { +) -> Result<reqwest::Client, AnyError> { + if let Some(client) = state.try_borrow::<reqwest::Client>() { Ok(client.clone()) } else { let options = state.borrow::<Options>(); let client = create_client_from_options(options)?; - state.put::<Client>(client.clone()); + state.put::<reqwest::Client>(client.clone()); Ok(client) } } pub fn create_client_from_options( options: &Options, -) -> Result<Client, AnyError> { +) -> Result<reqwest::Client, AnyError> { create_http_client( &options.user_agent, CreateHttpClientOptions { @@ -259,11 +253,11 @@ impl Stream for ResourceToBodyAdapter { } Poll::Ready(res) => match res { Ok(buf) if buf.is_empty() => Poll::Ready(None), - Ok(buf) => { + Ok(_) => { this.1 = Some(this.0.clone().read(64 * 1024)); - Poll::Ready(Some(Ok(buf.to_vec().into()))) + Poll::Ready(Some(res.map(|b| b.to_vec().into()))) } - Err(err) => Poll::Ready(Some(Err(err))), + _ => Poll::Ready(Some(res.map(|b| b.to_vec().into()))), }, } } else { @@ -272,22 +266,6 @@ impl Stream for ResourceToBodyAdapter { } } -impl hyper::body::Body for ResourceToBodyAdapter { - type Data = Bytes; - type Error = Error; - - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { - match self.poll_next(cx) { - Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - impl Drop for ResourceToBodyAdapter { fn drop(&mut self) { self.0.clone().close() @@ -369,11 +347,9 @@ where file_fetch_handler, .. } = state.borrow_mut::<Options>(); let file_fetch_handler = file_fetch_handler.clone(); - let (future, maybe_cancel_handle) = - file_fetch_handler.fetch_file(state, &url); - let request_rid = state - .resource_table - .add(FetchRequestResource { future, url }); + let (request, maybe_cancel_handle) = + file_fetch_handler.fetch_file(state, url); + let request_rid = state.resource_table.add(FetchRequestResource(request)); let maybe_cancel_handle_rid = maybe_cancel_handle .map(|ch| state.resource_table.add(FetchCancelHandle(ch))); @@ -383,31 +359,31 @@ where let permissions = state.borrow_mut::<FP>(); permissions.check_net_url(&url, "fetch()")?; - let uri = url - .as_str() - .parse::<Uri>() - .map_err(|_| type_error("Invalid URL"))?; + // Make sure that we have a valid URI early, as reqwest's `RequestBuilder::send` + // internally uses `expect_uri`, which panics instead of returning a usable `Result`. + if url.as_str().parse::<Uri>().is_err() { + return Err(type_error("Invalid URL")); + } - let mut con_len = None; - let body = if has_body { + let mut request = client.request(method.clone(), url); + + if has_body { match (data, resource) { (Some(data), _) => { // If a body is passed, we use it, and don't return a body for streaming. - con_len = Some(data.len() as u64); - - http_body_util::Full::new(data.to_vec().into()) - .map_err(|never| match never {}) - .boxed() + request = request.body(data.to_vec()); } (_, Some(resource)) => { let resource = state.resource_table.take_any(resource)?; match resource.size_hint() { (body_size, Some(n)) if body_size == n && body_size > 0 => { - con_len = Some(body_size); + request = + request.header(CONTENT_LENGTH, HeaderValue::from(body_size)); } _ => {} } - ReqBody::new(ResourceToBodyAdapter::new(resource)) + request = request + .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource))) } (None, None) => unreachable!(), } @@ -415,21 +391,11 @@ where // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch if matches!(method, Method::POST | Method::PUT) { - con_len = Some(0); + request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); } - http_body_util::Empty::new() - .map_err(|never| match never {}) - .boxed() }; - let mut request = http::Request::new(body); - *request.method_mut() = method.clone(); - *request.uri_mut() = uri; - - if let Some(len) = con_len { - request.headers_mut().insert(CONTENT_LENGTH, len.into()); - } - + let mut header_map = HeaderMap::new(); for (key, value) in headers { let name = HeaderName::from_bytes(&key) .map_err(|err| type_error(err.to_string()))?; @@ -437,34 +403,38 @@ where .map_err(|err| type_error(err.to_string()))?; if (name != HOST || allow_host) && name != CONTENT_LENGTH { - request.headers_mut().append(name, v); + header_map.append(name, v); } } - if request.headers().contains_key(RANGE) { + if header_map.contains_key(RANGE) { // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18 // If httpRequest’s header list contains `Range`, then append (`Accept-Encoding`, `identity`) - request - .headers_mut() + header_map .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity")); } + request = request.headers(header_map); let options = state.borrow::<Options>(); if let Some(request_builder_hook) = options.request_builder_hook { - request_builder_hook(&mut request) + request = request_builder_hook(request) .map_err(|err| type_error(err.to_string()))?; } let cancel_handle = CancelHandle::new_rc(); let cancel_handle_ = cancel_handle.clone(); - let fut = - async move { client.send(request).or_cancel(cancel_handle_).await }; + let fut = async move { + request + .send() + .or_cancel(cancel_handle_) + .await + .map(|res| res.map_err(|err| err.into())) + }; - let request_rid = state.resource_table.add(FetchRequestResource { - future: Box::pin(fut), - url, - }); + let request_rid = state + .resource_table + .add(FetchRequestResource(Box::pin(fut))); let cancel_handle_rid = state.resource_table.add(FetchCancelHandle(cancel_handle)); @@ -478,21 +448,17 @@ where let (body, _) = data_url .decode_to_vec() .map_err(|e| type_error(format!("{e:?}")))?; - let body = http_body_util::Full::new(body.into()) - .map_err(|never| match never {}) - .boxed(); let response = http::Response::builder() .status(http::StatusCode::OK) .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string()) - .body(body)?; + .body(reqwest::Body::from(body))?; - let fut = async move { Ok(Ok(response)) }; + let fut = async move { Ok(Ok(Response::from(response))) }; - let request_rid = state.resource_table.add(FetchRequestResource { - future: Box::pin(fut), - url, - }); + let request_rid = state + .resource_table + .add(FetchRequestResource(Box::pin(fut))); (request_rid, None) } @@ -539,21 +505,24 @@ pub async fn op_fetch_send( .ok() .expect("multiple op_fetch_send ongoing"); - let res = match request.future.await { + let res = match request.0.await { Ok(Ok(res)) => res, Ok(Err(err)) => { // We're going to try and rescue the error cause from a stream and return it from this fetch. - // If any error in the chain is a hyper body error, return that as a special result we can use to + // If any error in the chain is a reqwest body error, return that as a special result we can use to // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`). // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead let mut err_ref: &dyn std::error::Error = err.as_ref(); while let Some(err) = std::error::Error::source(err_ref) { - if let Some(err) = err.downcast_ref::<hyper::Error>() { - if let Some(err) = std::error::Error::source(err) { - return Ok(FetchResponse { - error: Some(err.to_string()), - ..Default::default() - }); + if let Some(err) = err.downcast_ref::<reqwest::Error>() { + if err.is_body() { + // Extracts the next error cause and uses that for the message + if let Some(err) = std::error::Error::source(err) { + return Ok(FetchResponse { + error: Some(err.to_string()), + ..Default::default() + }); + } } } err_ref = err; @@ -565,17 +534,14 @@ pub async fn op_fetch_send( }; let status = res.status(); - let url = request.url.into(); + let url = res.url().to_string(); let mut res_headers = Vec::new(); for (key, val) in res.headers().iter() { res_headers.push((key.as_str().into(), val.as_bytes().into())); } - let content_length = hyper::body::Body::size_hint(res.body()).exact(); - let remote_addr = res - .extensions() - .get::<hyper_util::client::legacy::connect::HttpInfo>() - .map(|info| info.remote_addr()); + let content_length = res.content_length(); + let remote_addr = res.remote_addr(); let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { (Some(addr.ip().to_string()), Some(addr.port())) } else { @@ -619,8 +585,7 @@ pub async fn op_fetch_response_upgrade( let upgraded = raw_response.upgrade().await?; { // Stage 3: Pump the data - let (mut upgraded_rx, mut upgraded_tx) = - tokio::io::split(TokioIo::new(upgraded)); + let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); spawn(async move { let mut buf = [0; 1024]; @@ -708,13 +673,11 @@ impl Resource for UpgradeStream { } } -type CancelableResponseResult = - Result<Result<http::Response<ResBody>, AnyError>, Canceled>; +type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>; -pub struct FetchRequestResource { - pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>, - pub url: Url, -} +pub struct FetchRequestResource( + pub Pin<Box<dyn Future<Output = CancelableResponseResult>>>, +); impl Resource for FetchRequestResource { fn name(&self) -> Cow<str> { @@ -738,7 +701,7 @@ type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; pub enum FetchResponseReader { - Start(http::Response<ResBody>), + Start(Response), BodyReader(Peekable<BytesStream>), } @@ -756,7 +719,7 @@ pub struct FetchResponseResource { } impl FetchResponseResource { - pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self { + pub fn new(response: Response, size: Option<u64>) -> Self { Self { response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)), cancel: CancelHandle::default(), @@ -764,10 +727,10 @@ impl FetchResponseResource { } } - pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, AnyError> { + pub async fn upgrade(self) -> Result<reqwest::Upgraded, AnyError> { let reader = self.response_reader.into_inner(); match reader { - FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?), + FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?), _ => unreachable!(), } } @@ -791,12 +754,11 @@ impl Resource for FetchResponseResource { match std::mem::take(&mut *reader) { FetchResponseReader::Start(resp) => { - let stream: BytesStream = - Box::pin(resp.into_body().into_data_stream().map(|r| { - r.map_err(|err| { - std::io::Error::new(std::io::ErrorKind::Other, err) - }) - })); + let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| { + r.map_err(|err| { + std::io::Error::new(std::io::ErrorKind::Other, err) + }) + })); *reader = FetchResponseReader::BodyReader(stream.peekable()); } FetchResponseReader::BodyReader(_) => unreachable!(), @@ -960,7 +922,7 @@ impl Default for CreateHttpClientOptions { } } -/// Create new instance of async Client. This client supports +/// Create new instance of async reqwest::Client. This client supports /// proxies and doesn't follow redirects. pub fn create_http_client( user_agent: &str, @@ -982,64 +944,43 @@ pub fn create_http_client( alpn_protocols.push("http/1.1".into()); } tls_config.alpn_protocols = alpn_protocols; - let tls_config = Arc::from(tls_config); - let mut http_connector = HttpConnector::new(); - http_connector.enforce_http(false); - let connector = HttpsConnector::from((http_connector, tls_config.clone())); + let mut headers = HeaderMap::new(); + headers.insert(USER_AGENT, user_agent.parse().unwrap()); + let mut builder = Client::builder() + .redirect(Policy::none()) + .default_headers(headers) + .use_preconfigured_tls(tls_config); - let user_agent = user_agent - .parse::<HeaderValue>() - .map_err(|_| type_error("illegal characters in User-Agent"))?; - - let mut builder = - hyper_util::client::legacy::Builder::new(TokioExecutor::new()); - builder.timer(TokioTimer::new()); - builder.pool_timer(TokioTimer::new()); - - let mut proxies = proxy::from_env(); if let Some(proxy) = options.proxy { - let mut intercept = proxy::Intercept::all(&proxy.url) - .ok_or_else(|| type_error("invalid proxy url"))?; + let mut reqwest_proxy = reqwest::Proxy::all(&proxy.url)?; if let Some(basic_auth) = &proxy.basic_auth { - intercept.set_auth(&basic_auth.username, &basic_auth.password); + reqwest_proxy = + reqwest_proxy.basic_auth(&basic_auth.username, &basic_auth.password); } - proxies.prepend(intercept); + builder = builder.proxy(reqwest_proxy); } - let proxies = Arc::new(proxies); - let mut connector = - proxy::ProxyConnector::new(proxies.clone(), connector, tls_config); - connector.user_agent(user_agent.clone()); if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host { - builder.pool_max_idle_per_host(pool_max_idle_per_host); + builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); } if let Some(pool_idle_timeout) = options.pool_idle_timeout { - builder.pool_idle_timeout( + builder = builder.pool_idle_timeout( pool_idle_timeout.map(std::time::Duration::from_millis), ); } match (options.http1, options.http2) { - (true, false) => {} // noop, handled by ALPN above - (false, true) => { - builder.http2_only(true); - } + (true, false) => builder = builder.http1_only(), + (false, true) => builder = builder.http2_prior_knowledge(), (true, true) => {} (false, false) => { return Err(type_error("Either `http1` or `http2` needs to be true")) } } - let pooled_client = builder.build(connector); - let decompress = Decompression::new(pooled_client).gzip(true).br(true); - - Ok(Client { - inner: decompress, - proxies, - user_agent, - }) + builder.build().map_err(|e| e.into()) } #[op2] @@ -1049,35 +990,3 @@ pub fn op_utf8_to_byte_string( ) -> Result<ByteString, AnyError> { Ok(input.into()) } - -#[derive(Clone, Debug)] -pub struct Client { - inner: Decompression<hyper_util::client::legacy::Client<Connector, ReqBody>>, - // Used to check whether to include a proxy-authorization header - proxies: Arc<proxy::Proxies>, - user_agent: HeaderValue, -} - -type Connector = proxy::ProxyConnector<HttpsConnector<HttpConnector>>; - -impl Client { - pub async fn send( - self, - mut req: http::Request<ReqBody>, - ) -> Result<http::Response<ResBody>, AnyError> { - req - .headers_mut() - .entry(USER_AGENT) - .or_insert_with(|| self.user_agent.clone()); - - if let Some(auth) = self.proxies.http_forward_auth(req.uri()) { - req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone()); - } - - let resp = self.inner.oneshot(req).await?; - Ok(resp.map(|b| b.map_err(|e| anyhow!(e)).boxed())) - } -} - -pub type ReqBody = http_body_util::combinators::BoxBody<Bytes, Error>; -pub type ResBody = http_body_util::combinators::BoxBody<Bytes, Error>; |