diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2021-08-11 12:27:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-11 12:27:05 +0200 |
commit | a0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch) | |
tree | 90671b004537e20f9493fd3277ffd21d30b39a0e /ext/fetch/lib.rs | |
parent | 3a6994115176781b3a93d70794b1b81bc95e42b4 (diff) |
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r-- | ext/fetch/lib.rs | 567 |
1 files changed, 567 insertions, 0 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs new file mode 100644 index 000000000..e89df470a --- /dev/null +++ b/ext/fetch/lib.rs @@ -0,0 +1,567 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use data_url::DataUrl; +use deno_core::error::bad_resource_id; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::Future; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; +use deno_core::include_js_files; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::url::Url; +use deno_core::AsyncRefCell; +use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::Canceled; +use deno_core::Extension; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use deno_tls::create_http_client; +use deno_tls::rustls::RootCertStore; +use deno_tls::Proxy; +use deno_web::BlobStore; +use http::header::CONTENT_LENGTH; +use reqwest::header::HeaderName; +use reqwest::header::HeaderValue; +use reqwest::header::HOST; +use reqwest::Body; +use reqwest::Client; +use reqwest::Method; +use reqwest::RequestBuilder; +use reqwest::Response; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::convert::From; +use std::fs::File; +use std::io::Read; +use std::path::Path; +use std::path::PathBuf; +use std::pin::Pin; +use std::rc::Rc; +use tokio::io::AsyncReadExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::io::StreamReader; + +pub use reqwest; // Re-export reqwest + +pub fn init<P: FetchPermissions + 'static>( + user_agent: String, + root_cert_store: Option<RootCertStore>, + proxy: Option<Proxy>, + request_builder_hook: Option<fn(RequestBuilder) -> RequestBuilder>, + unsafely_ignore_certificate_errors: Option<Vec<String>>, +) -> Extension { + Extension::builder() + .js(include_js_files!( + prefix "deno:ext/fetch", + "01_fetch_util.js", + "20_headers.js", + "21_formdata.js", + "22_body.js", + "22_http_client.js", + "23_request.js", + "23_response.js", + "26_fetch.js", + )) + .ops(vec![ + ("op_fetch", op_sync(op_fetch::<P>)), + ("op_fetch_send", op_async(op_fetch_send)), + ("op_fetch_request_write", op_async(op_fetch_request_write)), + ("op_fetch_response_read", op_async(op_fetch_response_read)), + ("op_create_http_client", op_sync(op_create_http_client::<P>)), + ]) + .state(move |state| { + state.put::<reqwest::Client>({ + create_http_client( + user_agent.clone(), + root_cert_store.clone(), + None, + proxy.clone(), + unsafely_ignore_certificate_errors.clone(), + ) + .unwrap() + }); + state.put::<HttpClientDefaults>(HttpClientDefaults { + user_agent: user_agent.clone(), + root_cert_store: root_cert_store.clone(), + proxy: proxy.clone(), + request_builder_hook, + unsafely_ignore_certificate_errors: unsafely_ignore_certificate_errors + .clone(), + }); + Ok(()) + }) + .build() +} + +pub struct HttpClientDefaults { + pub user_agent: String, + pub root_cert_store: Option<RootCertStore>, + pub proxy: Option<Proxy>, + pub request_builder_hook: Option<fn(RequestBuilder) -> RequestBuilder>, + pub unsafely_ignore_certificate_errors: Option<Vec<String>>, +} + +pub trait FetchPermissions { + fn check_net_url(&mut self, _url: &Url) -> Result<(), AnyError>; + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError>; +} + +/// For use with `op_fetch` when the user does not want permissions. +pub struct NoFetchPermissions; + +impl FetchPermissions for NoFetchPermissions { + fn check_net_url(&mut self, _url: &Url) -> Result<(), AnyError> { + Ok(()) + } + + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } +} + +pub fn get_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_fetch.d.ts") +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FetchArgs { + method: ByteString, + url: String, + headers: Vec<(ByteString, ByteString)>, + client_rid: Option<u32>, + has_body: bool, + body_length: Option<u64>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FetchReturn { + request_rid: ResourceId, + request_body_rid: Option<ResourceId>, + cancel_handle_rid: Option<ResourceId>, +} + +pub fn op_fetch<FP>( + state: &mut OpState, + args: FetchArgs, + data: Option<ZeroCopyBuf>, +) -> Result<FetchReturn, AnyError> +where + FP: FetchPermissions + 'static, +{ + let client = if let Some(rid) = args.client_rid { + let r = state + .resource_table + .get::<HttpClientResource>(rid) + .ok_or_else(bad_resource_id)?; + r.client.clone() + } else { + let client = state.borrow::<reqwest::Client>(); + client.clone() + }; + + let method = Method::from_bytes(&args.method)?; + let url = Url::parse(&args.url)?; + + // Check scheme before asking for net permission + let scheme = url.scheme(); + let (request_rid, request_body_rid, cancel_handle_rid) = match scheme { + "http" | "https" => { + let permissions = state.borrow_mut::<FP>(); + permissions.check_net_url(&url)?; + + let mut request = client.request(method, url); + + let request_body_rid = if args.has_body { + match data { + None => { + // If no body is passed, we return a writer for streaming the body. + let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); + + // If the size of the body is known, we include a content-length + // header explicitly. + if let Some(body_size) = args.body_length { + request = + request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) + } + + request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); + + let request_body_rid = + state.resource_table.add(FetchRequestBodyResource { + body: AsyncRefCell::new(tx), + cancel: CancelHandle::default(), + }); + + Some(request_body_rid) + } + Some(data) => { + // If a body is passed, we use it, and don't return a body for streaming. + request = request.body(Vec::from(&*data)); + None + } + } + } else { + None + }; + + for (key, value) in args.headers { + let name = HeaderName::from_bytes(&key).unwrap(); + let v = HeaderValue::from_bytes(&value).unwrap(); + if name != HOST { + request = request.header(name, v); + } + } + + let defaults = state.borrow::<HttpClientDefaults>(); + if let Some(request_builder_hook) = defaults.request_builder_hook { + request = request_builder_hook(request); + } + + let cancel_handle = CancelHandle::new_rc(); + let cancel_handle_ = cancel_handle.clone(); + + let fut = async move { + request + .send() + .or_cancel(cancel_handle_) + .await + .map(|res| res.map_err(|err| type_error(err.to_string()))) + }; + + let request_rid = state + .resource_table + .add(FetchRequestResource(Box::pin(fut))); + + let cancel_handle_rid = + state.resource_table.add(FetchCancelHandle(cancel_handle)); + + (request_rid, request_body_rid, Some(cancel_handle_rid)) + } + "data" => { + let data_url = DataUrl::process(url.as_str()) + .map_err(|e| type_error(format!("{:?}", e)))?; + + let (body, _) = data_url + .decode_to_vec() + .map_err(|e| type_error(format!("{:?}", e)))?; + + let response = http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string()) + .body(reqwest::Body::from(body))?; + + let fut = async move { Ok(Ok(Response::from(response))) }; + + let request_rid = state + .resource_table + .add(FetchRequestResource(Box::pin(fut))); + + (request_rid, None, None) + } + "blob" => { + let blob_store = state.try_borrow::<BlobStore>().ok_or_else(|| { + type_error("Blob URLs are not supported in this context.") + })?; + + let blob = blob_store + .get_object_url(url)? + .ok_or_else(|| type_error("Blob for the given URL not found."))?; + + if method != "GET" { + return Err(type_error("Blob URL fetch only supports GET method.")); + } + + let cancel_handle = CancelHandle::new_rc(); + let cancel_handle_ = cancel_handle.clone(); + + let fut = async move { + // TODO(lucacsonato): this should be a stream! + let chunk = match blob.read_all().or_cancel(cancel_handle_).await? { + Ok(chunk) => chunk, + Err(err) => return Ok(Err(err)), + }; + + let res = http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_LENGTH, chunk.len()) + .header(http::header::CONTENT_TYPE, blob.media_type.clone()) + .body(reqwest::Body::from(chunk)) + .map_err(|err| type_error(err.to_string())); + + match res { + Ok(response) => Ok(Ok(Response::from(response))), + Err(err) => Ok(Err(err)), + } + }; + + let request_rid = state + .resource_table + .add(FetchRequestResource(Box::pin(fut))); + + let cancel_handle_rid = + state.resource_table.add(FetchCancelHandle(cancel_handle)); + + (request_rid, None, Some(cancel_handle_rid)) + } + _ => return Err(type_error(format!("scheme '{}' not supported", scheme))), + }; + + Ok(FetchReturn { + request_rid, + request_body_rid, + cancel_handle_rid, + }) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FetchResponse { + status: u16, + status_text: String, + headers: Vec<(ByteString, ByteString)>, + url: String, + response_rid: ResourceId, +} + +pub async fn op_fetch_send( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<FetchResponse, AnyError> { + let request = state + .borrow_mut() + .resource_table + .take::<FetchRequestResource>(rid) + .ok_or_else(bad_resource_id)?; + + let request = Rc::try_unwrap(request) + .ok() + .expect("multiple op_fetch_send ongoing"); + + let res = match request.0.await { + Ok(Ok(res)) => res, + Ok(Err(err)) => return Err(type_error(err.to_string())), + Err(_) => return Err(type_error("request was cancelled")), + }; + + //debug!("Fetch response {}", url); + let status = res.status(); + let url = res.url().to_string(); + let mut res_headers = Vec::new(); + for (key, val) in res.headers().iter() { + let key_bytes: &[u8] = key.as_ref(); + res_headers.push(( + ByteString(key_bytes.to_owned()), + ByteString(val.as_bytes().to_owned()), + )); + } + + let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let stream_reader = StreamReader::new(stream); + let rid = state + .borrow_mut() + .resource_table + .add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream_reader), + cancel: CancelHandle::default(), + }); + + Ok(FetchResponse { + status: status.as_u16(), + status_text: status.canonical_reason().unwrap_or("").to_string(), + headers: res_headers, + url, + response_rid: rid, + }) +} + +pub async fn op_fetch_request_write( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: Option<ZeroCopyBuf>, +) -> Result<(), AnyError> { + let data = data.ok_or_else(null_opbuf)?; + let buf = Vec::from(&*data); + + let resource = state + .borrow() + .resource_table + .get::<FetchRequestBodyResource>(rid) + .ok_or_else(bad_resource_id)?; + let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + body.send(Ok(buf)).or_cancel(cancel).await?.map_err(|_| { + type_error("request body receiver not connected (request closed)") + })?; + + Ok(()) +} + +pub async fn op_fetch_response_read( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: Option<ZeroCopyBuf>, +) -> Result<usize, AnyError> { + let data = data.ok_or_else(null_opbuf)?; + + let resource = state + .borrow() + .resource_table + .get::<FetchResponseBodyResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let mut buf = data.clone(); + let read = reader.read(&mut buf).try_or_cancel(cancel).await?; + Ok(read) +} + +type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>; + +struct FetchRequestResource( + Pin<Box<dyn Future<Output = CancelableResponseResult>>>, +); + +impl Resource for FetchRequestResource { + fn name(&self) -> Cow<str> { + "fetchRequest".into() + } +} + +struct FetchCancelHandle(Rc<CancelHandle>); + +impl Resource for FetchCancelHandle { + fn name(&self) -> Cow<str> { + "fetchCancelHandle".into() + } + + fn close(self: Rc<Self>) { + self.0.cancel() + } +} + +struct FetchRequestBodyResource { + body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>, + cancel: CancelHandle, +} + +impl Resource for FetchRequestBodyResource { + fn name(&self) -> Cow<str> { + "fetchRequestBody".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +type BytesStream = + Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; + +struct FetchResponseBodyResource { + reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, + cancel: CancelHandle, +} + +impl Resource for FetchResponseBodyResource { + fn name(&self) -> Cow<str> { + "fetchResponseBody".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +struct HttpClientResource { + client: Client, +} + +impl Resource for HttpClientResource { + fn name(&self) -> Cow<str> { + "httpClient".into() + } +} + +impl HttpClientResource { + fn new(client: Client) -> Self { + Self { client } + } +} + +#[derive(Deserialize, Default, Debug)] +#[serde(rename_all = "camelCase")] +#[serde(default)] +pub struct CreateHttpClientOptions { + ca_stores: Option<Vec<String>>, + ca_file: Option<String>, + ca_data: Option<ByteString>, + proxy: Option<Proxy>, +} + +pub fn op_create_http_client<FP>( + state: &mut OpState, + args: CreateHttpClientOptions, + _: (), +) -> Result<ResourceId, AnyError> +where + FP: FetchPermissions + 'static, +{ + if let Some(ca_file) = args.ca_file.clone() { + let permissions = state.borrow_mut::<FP>(); + permissions.check_read(&PathBuf::from(ca_file))?; + } + + if let Some(proxy) = args.proxy.clone() { + let permissions = state.borrow_mut::<FP>(); + let url = Url::parse(&proxy.url)?; + permissions.check_net_url(&url)?; + } + + let defaults = state.borrow::<HttpClientDefaults>(); + let cert_data = + get_cert_data(args.ca_file.as_deref(), args.ca_data.as_deref())?; + + let client = create_http_client( + defaults.user_agent.clone(), + defaults.root_cert_store.clone(), + cert_data, + args.proxy, + defaults.unsafely_ignore_certificate_errors.clone(), + ) + .unwrap(); + + let rid = state.resource_table.add(HttpClientResource::new(client)); + Ok(rid) +} + +fn get_cert_data( + ca_file: Option<&str>, + ca_data: Option<&[u8]>, +) -> Result<Option<Vec<u8>>, AnyError> { + if let Some(ca_data) = ca_data { + Ok(Some(ca_data.to_vec())) + } else if let Some(ca_file) = ca_file { + let mut buf = Vec::new(); + File::open(ca_file)?.read_to_end(&mut buf)?; + Ok(Some(buf)) + } else { + Ok(None) + } +} |