diff options
Diffstat (limited to 'op_crates/fetch/lib.rs')
-rw-r--r-- | op_crates/fetch/lib.rs | 497 |
1 files changed, 0 insertions, 497 deletions
diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs deleted file mode 100644 index 861fc6e54..000000000 --- a/op_crates/fetch/lib.rs +++ /dev/null @@ -1,497 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use deno_core::error::bad_resource_id; -use deno_core::error::generic_error; -use deno_core::error::null_opbuf; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::Future; -use deno_core::futures::Stream; -use deno_core::futures::StreamExt; -use deno_core::include_js_files; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::url::Url; -use deno_core::AsyncRefCell; -use deno_core::CancelFuture; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; - -use data_url::DataUrl; -use deno_file::BlobUrlStore; -use reqwest::header::HeaderMap; -use reqwest::header::HeaderName; -use reqwest::header::HeaderValue; -use reqwest::header::USER_AGENT; -use reqwest::redirect::Policy; -use reqwest::Body; -use reqwest::Client; -use reqwest::Method; -use reqwest::Response; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::convert::From; -use std::fs::File; -use std::io::Read; -use std::path::Path; -use std::path::PathBuf; -use std::pin::Pin; -use std::rc::Rc; -use tokio::io::AsyncReadExt; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::StreamReader; - -pub use reqwest; // Re-export reqwest - -pub fn init<P: FetchPermissions + 'static>( - user_agent: String, - ca_data: Option<Vec<u8>>, -) -> Extension { - Extension::builder() - .js(include_js_files!( - prefix "deno:op_crates/fetch", - "01_fetch_util.js", - "11_streams.js", - "20_headers.js", - "21_formdata.js", - "22_body.js", - "22_http_client.js", - "23_request.js", - "23_response.js", - "26_fetch.js", - )) - .ops(vec![ - ("op_fetch", op_sync(op_fetch::<P>)), - ("op_fetch_send", op_async(op_fetch_send)), - ("op_fetch_request_write", op_async(op_fetch_request_write)), - ("op_fetch_response_read", op_async(op_fetch_response_read)), - ("op_create_http_client", op_sync(op_create_http_client::<P>)), - ]) - .state(move |state| { - state.put::<reqwest::Client>({ - create_http_client(user_agent.clone(), ca_data.clone()).unwrap() - }); - state.put::<HttpClientDefaults>(HttpClientDefaults { - ca_data: ca_data.clone(), - user_agent: user_agent.clone(), - }); - Ok(()) - }) - .build() -} - -pub struct HttpClientDefaults { - pub user_agent: String, - pub ca_data: Option<Vec<u8>>, -} - -pub trait FetchPermissions { - fn check_net_url(&mut self, _url: &Url) -> Result<(), AnyError>; - fn check_read(&mut self, _p: &Path) -> Result<(), AnyError>; -} - -/// For use with `op_fetch` when the user does not want permissions. -pub struct NoFetchPermissions; - -impl FetchPermissions for NoFetchPermissions { - fn check_net_url(&mut self, _url: &Url) -> Result<(), AnyError> { - Ok(()) - } - - fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> { - Ok(()) - } -} - -pub fn get_declaration() -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_fetch.d.ts") -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct FetchArgs { - method: String, - url: String, - headers: Vec<(String, String)>, - client_rid: Option<u32>, - has_body: bool, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct FetchReturn { - request_rid: ResourceId, - request_body_rid: Option<ResourceId>, -} - -pub fn op_fetch<FP>( - state: &mut OpState, - args: FetchArgs, - data: Option<ZeroCopyBuf>, -) -> Result<FetchReturn, AnyError> -where - FP: FetchPermissions + 'static, -{ - let client = if let Some(rid) = args.client_rid { - let r = state - .resource_table - .get::<HttpClientResource>(rid) - .ok_or_else(bad_resource_id)?; - r.client.clone() - } else { - let client = state.borrow::<reqwest::Client>(); - client.clone() - }; - - let method = Method::from_bytes(args.method.as_bytes())?; - let url = Url::parse(&args.url)?; - - // Check scheme before asking for net permission - let scheme = url.scheme(); - let (request_rid, request_body_rid) = match scheme { - "http" | "https" => { - let permissions = state.borrow_mut::<FP>(); - permissions.check_net_url(&url)?; - - let mut request = client.request(method, url); - - let request_body_rid = if args.has_body { - match data { - None => { - // If no body is passed, we return a writer for streaming the body. - let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); - request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); - - let request_body_rid = - state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(tx), - cancel: CancelHandle::default(), - }); - - Some(request_body_rid) - } - Some(data) => { - // If a body is passed, we use it, and don't return a body for streaming. - request = request.body(Vec::from(&*data)); - None - } - } - } else { - None - }; - - for (key, value) in args.headers { - let name = HeaderName::from_bytes(key.as_bytes()).unwrap(); - let v = HeaderValue::from_str(&value).unwrap(); - request = request.header(name, v); - } - - let fut = request.send(); - - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); - - (request_rid, request_body_rid) - } - "data" => { - let data_url = DataUrl::process(url.as_str()) - .map_err(|e| type_error(format!("{:?}", e)))?; - - let (body, _) = data_url - .decode_to_vec() - .map_err(|e| type_error(format!("{:?}", e)))?; - - let response = http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string()) - .body(reqwest::Body::from(body))?; - - let fut = async move { Ok(Response::from(response)) }; - - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); - - (request_rid, None) - } - "blob" => { - let blob_url_storage = - state.try_borrow::<BlobUrlStore>().ok_or_else(|| { - type_error("Blob URLs are not supported in this context.") - })?; - - let blob = blob_url_storage - .get(url)? - .ok_or_else(|| type_error("Blob for the given URL not found."))?; - - if method != "GET" { - return Err(type_error("Blob URL fetch only supports GET method.")); - } - - let response = http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_LENGTH, blob.data.len()) - .header(http::header::CONTENT_TYPE, blob.media_type) - .body(reqwest::Body::from(blob.data))?; - - let fut = async move { Ok(Response::from(response)) }; - - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); - - (request_rid, None) - } - _ => return Err(type_error(format!("scheme '{}' not supported", scheme))), - }; - - Ok(FetchReturn { - request_rid, - request_body_rid, - }) -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct FetchResponse { - status: u16, - status_text: String, - headers: Vec<(String, String)>, - url: String, - response_rid: ResourceId, -} - -pub async fn op_fetch_send( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - _data: Option<ZeroCopyBuf>, -) -> Result<FetchResponse, AnyError> { - let request = state - .borrow_mut() - .resource_table - .take::<FetchRequestResource>(rid) - .ok_or_else(bad_resource_id)?; - - let request = Rc::try_unwrap(request) - .ok() - .expect("multiple op_fetch_send ongoing"); - - let res = match request.0.await { - Ok(res) => res, - Err(e) => return Err(type_error(e.to_string())), - }; - - //debug!("Fetch response {}", url); - let status = res.status(); - let url = res.url().to_string(); - let mut res_headers = Vec::new(); - for (key, val) in res.headers().iter() { - let key_string = key.to_string(); - - if val.as_bytes().is_ascii() { - res_headers.push((key_string, val.to_str().unwrap().to_owned())) - } else { - res_headers.push(( - key_string, - val - .as_bytes() - .iter() - .map(|&c| c as char) - .collect::<String>(), - )); - } - } - - let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let stream_reader = StreamReader::new(stream); - let rid = state - .borrow_mut() - .resource_table - .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream_reader), - cancel: CancelHandle::default(), - }); - - Ok(FetchResponse { - status: status.as_u16(), - status_text: status.canonical_reason().unwrap_or("").to_string(), - headers: res_headers, - url, - response_rid: rid, - }) -} - -pub async fn op_fetch_request_write( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: Option<ZeroCopyBuf>, -) -> Result<(), AnyError> { - let data = data.ok_or_else(null_opbuf)?; - let buf = Vec::from(&*data); - - let resource = state - .borrow() - .resource_table - .get::<FetchRequestBodyResource>(rid) - .ok_or_else(bad_resource_id)?; - let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - body.send(Ok(buf)).or_cancel(cancel).await??; - - Ok(()) -} - -pub async fn op_fetch_response_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: Option<ZeroCopyBuf>, -) -> Result<usize, AnyError> { - let data = data.ok_or_else(null_opbuf)?; - - let resource = state - .borrow() - .resource_table - .get::<FetchResponseBodyResource>(rid) - .ok_or_else(bad_resource_id)?; - let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut buf = data.clone(); - let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok(read) -} - -struct FetchRequestResource( - Pin<Box<dyn Future<Output = Result<Response, reqwest::Error>>>>, -); - -impl Resource for FetchRequestResource { - fn name(&self) -> Cow<str> { - "fetchRequest".into() - } -} - -struct FetchRequestBodyResource { - body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>, - cancel: CancelHandle, -} - -impl Resource for FetchRequestBodyResource { - fn name(&self) -> Cow<str> { - "fetchRequestBody".into() - } -} - -type BytesStream = - Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; - -struct FetchResponseBodyResource { - reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, - cancel: CancelHandle, -} - -impl Resource for FetchResponseBodyResource { - fn name(&self) -> Cow<str> { - "fetchResponseBody".into() - } -} - -struct HttpClientResource { - client: Client, -} - -impl Resource for HttpClientResource { - fn name(&self) -> Cow<str> { - "httpClient".into() - } -} - -impl HttpClientResource { - fn new(client: Client) -> Self { - Self { client } - } -} - -#[derive(Deserialize, Default, Debug)] -#[serde(rename_all = "camelCase")] -#[serde(default)] -pub struct CreateHttpClientOptions { - ca_file: Option<String>, - ca_data: Option<String>, -} - -pub fn op_create_http_client<FP>( - state: &mut OpState, - args: CreateHttpClientOptions, - _zero_copy: Option<ZeroCopyBuf>, -) -> Result<ResourceId, AnyError> -where - FP: FetchPermissions + 'static, -{ - if let Some(ca_file) = args.ca_file.clone() { - let permissions = state.borrow_mut::<FP>(); - permissions.check_read(&PathBuf::from(ca_file))?; - } - - let defaults = state.borrow::<HttpClientDefaults>(); - - let cert_data = - get_cert_data(args.ca_file.as_deref(), args.ca_data.as_deref())?; - let client = create_http_client( - defaults.user_agent.clone(), - cert_data.or_else(|| defaults.ca_data.clone()), - ) - .unwrap(); - - let rid = state.resource_table.add(HttpClientResource::new(client)); - Ok(rid) -} - -fn get_cert_data( - ca_file: Option<&str>, - ca_data: Option<&str>, -) -> Result<Option<Vec<u8>>, AnyError> { - if let Some(ca_data) = ca_data { - Ok(Some(ca_data.as_bytes().to_vec())) - } else if let Some(ca_file) = ca_file { - let mut buf = Vec::new(); - File::open(ca_file)?.read_to_end(&mut buf)?; - Ok(Some(buf)) - } else { - Ok(None) - } -} - -/// Create new instance of async reqwest::Client. This client supports -/// proxies and doesn't follow redirects. -pub fn create_http_client( - user_agent: String, - ca_data: Option<Vec<u8>>, -) -> Result<Client, AnyError> { - let mut headers = HeaderMap::new(); - headers.insert(USER_AGENT, user_agent.parse().unwrap()); - let mut builder = Client::builder() - .redirect(Policy::none()) - .default_headers(headers) - .use_rustls_tls(); - - if let Some(ca_data) = ca_data { - let cert = reqwest::Certificate::from_pem(&ca_data)?; - builder = builder.add_root_certificate(cert); - } - - builder - .build() - .map_err(|e| generic_error(format!("Unable to build http client: {}", e))) -} |