summaryrefslogtreecommitdiff
path: root/ext/node/ops/http.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-01 08:56:10 -0700
committerGitHub <noreply@github.com>2023-12-01 08:56:10 -0700
commite6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch)
tree57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/node/ops/http.rs
parent687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff)
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/node/ops/http.rs')
-rw-r--r--ext/node/ops/http.rs25
1 files changed, 7 insertions, 18 deletions
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index 40ef6df32..fd593244c 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -4,18 +4,17 @@ use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::url::Url;
-use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::ResourceId;
use deno_fetch::get_or_create_client_from_state;
-use deno_fetch::FetchBodyStream;
use deno_fetch::FetchCancelHandle;
-use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
+use deno_fetch::ResourceToBodyAdapter;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@@ -31,7 +30,7 @@ pub fn op_node_http_request<P>(
#[string] url: String,
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
- has_body: bool,
+ #[smi] body: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
where
P: crate::NodePermissions + 'static,
@@ -63,25 +62,16 @@ where
let mut request = client.request(method.clone(), url).headers(header_map);
- let request_body_rid = if has_body {
- // If no body is passed, we return a writer for streaming the body.
- let (tx, stream) = tokio::sync::mpsc::channel(1);
-
- request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
-
- let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(Some(tx)),
- cancel: CancelHandle::default(),
- });
-
- Some(request_body_rid)
+ if let Some(body) = body {
+ request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new(
+ state.resource_table.take_any(body)?,
+ )));
} else {
// POST and PUT requests should always have a 0 length content-length,
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
- None
};
let cancel_handle = CancelHandle::new_rc();
@@ -104,7 +94,6 @@ where
Ok(FetchReturn {
request_rid,
- request_body_rid,
cancel_handle_rid: Some(cancel_handle_rid),
})
}