summaryrefslogtreecommitdiff
path: root/ext/node/ops/http.rs
blob: 2a4d31f50a8998df03d1b06d3394b1c58ee68d66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_fetch::get_or_create_client_from_state;
use deno_fetch::FetchBodyStream;
use deno_fetch::FetchCancelHandle;
use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use reqwest::header::CONTENT_LENGTH;
use reqwest::Body;
use reqwest::Method;

#[op]
pub fn op_node_http_request<P>(
  state: &mut OpState,
  method: ByteString,
  url: String,
  headers: Vec<(ByteString, ByteString)>,
  client_rid: Option<u32>,
  has_body: bool,
) -> Result<FetchReturn, AnyError>
where
  P: crate::NodePermissions + 'static,
{
  let client = if let Some(rid) = client_rid {
    let r = state.resource_table.get::<HttpClientResource>(rid)?;
    r.client.clone()
  } else {
    get_or_create_client_from_state(state)?
  };

  let method = Method::from_bytes(&method)?;
  let url = Url::parse(&url)?;

  {
    let permissions = state.borrow_mut::<P>();
    permissions.check_net_url(&url, "ClientRequest")?;
  }

  let mut header_map = HeaderMap::new();
  for (key, value) in headers {
    let name = HeaderName::from_bytes(&key)
      .map_err(|err| type_error(err.to_string()))?;
    let v = HeaderValue::from_bytes(&value)
      .map_err(|err| type_error(err.to_string()))?;

    header_map.append(name, v);
  }

  let mut request = client.request(method.clone(), url).headers(header_map);

  let request_body_rid = if has_body {
    // If no body is passed, we return a writer for streaming the body.
    let (tx, stream) = tokio::sync::mpsc::channel(1);

    request = request.body(Body::wrap_stream(FetchBodyStream(stream)));

    let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
      body: AsyncRefCell::new(Some(tx)),
      cancel: CancelHandle::default(),
    });

    Some(request_body_rid)
  } else {
    // POST and PUT requests should always have a 0 length content-length,
    // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
    if matches!(method, Method::POST | Method::PUT) {
      request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
    }
    None
  };

  let cancel_handle = CancelHandle::new_rc();
  let cancel_handle_ = cancel_handle.clone();

  let fut = async move {
    request
      .send()
      .or_cancel(cancel_handle_)
      .await
      .map(|res| res.map_err(|err| type_error(err.to_string())))
  };

  let request_rid = state
    .resource_table
    .add(FetchRequestResource(Box::pin(fut)));

  let cancel_handle_rid =
    state.resource_table.add(FetchCancelHandle(cancel_handle));

  Ok(FetchReturn {
    request_rid,
    request_body_rid,
    cancel_handle_rid: Some(cancel_handle_rid),
  })
}