diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/node/ops/http.rs | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/node/ops/http.rs')
-rw-r--r-- | ext/node/ops/http.rs | 25 |
1 files changed, 7 insertions, 18 deletions
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 40ef6df32..fd593244c 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -4,18 +4,17 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op2; use deno_core::url::Url; -use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::ResourceId; use deno_fetch::get_or_create_client_from_state; -use deno_fetch::FetchBodyStream; use deno_fetch::FetchCancelHandle; -use deno_fetch::FetchRequestBodyResource; use deno_fetch::FetchRequestResource; use deno_fetch::FetchReturn; use deno_fetch::HttpClientResource; +use deno_fetch::ResourceToBodyAdapter; use reqwest::header::HeaderMap; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; @@ -31,7 +30,7 @@ pub fn op_node_http_request<P>( #[string] url: String, #[serde] headers: Vec<(ByteString, ByteString)>, #[smi] client_rid: Option<u32>, - has_body: bool, + #[smi] body: Option<ResourceId>, ) -> Result<FetchReturn, AnyError> where P: crate::NodePermissions + 'static, @@ -63,25 +62,16 @@ where let mut request = client.request(method.clone(), url).headers(header_map); - let request_body_rid = if has_body { - // If no body is passed, we return a writer for streaming the body. - let (tx, stream) = tokio::sync::mpsc::channel(1); - - request = request.body(Body::wrap_stream(FetchBodyStream(stream))); - - let request_body_rid = state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(Some(tx)), - cancel: CancelHandle::default(), - }); - - Some(request_body_rid) + if let Some(body) = body { + request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new( + state.resource_table.take_any(body)?, + ))); } else { // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch if matches!(method, Method::POST | Method::PUT) { request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); } - None }; let cancel_handle = CancelHandle::new_rc(); @@ -104,7 +94,6 @@ where Ok(FetchReturn { request_rid, - request_body_rid, cancel_handle_rid: Some(cancel_handle_rid), }) } |