summaryrefslogtreecommitdiff
path: root/ext/node/ops/http.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-03 14:27:25 -0600
committerGitHub <noreply@github.com>2023-08-03 14:27:25 -0600
commit7f8bf2537db0ae596a2c1baabd4011a190666ca6 (patch)
tree3dfb8df29ef39ee5eed9bc19dc57135374a338bd /ext/node/ops/http.rs
parent0f07dc95f130b9ace00ad98f1b2a3f5c34662e4a (diff)
refactor(ext/fetch): refactor fetch to use new write_error method (#20029)
This is a prerequisite for fast streams work -- this particular resource used a custom `mpsc`-style stream, and this work will allow us to unify it with the streams in `ext/http` in time. Instead of using Option as an internal semaphore for "correctly completed EOF", we allow code to propagate errors into the channel which can be picked up by downstream sinks like Hyper. EOF is signalled using a more standard sender drop.
Diffstat (limited to 'ext/node/ops/http.rs')
-rw-r--r--ext/node/ops/http.rs8
1 files changed, 4 insertions, 4 deletions
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index cc7dbf522..2a4d31f50 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -10,12 +10,12 @@ use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_fetch::get_or_create_client_from_state;
+use deno_fetch::FetchBodyStream;
use deno_fetch::FetchCancelHandle;
use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
-use deno_fetch::MpscByteStream;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@@ -64,12 +64,12 @@ where
let request_body_rid = if has_body {
// If no body is passed, we return a writer for streaming the body.
- let (stream, tx) = MpscByteStream::new();
+ let (tx, stream) = tokio::sync::mpsc::channel(1);
- request = request.body(Body::wrap_stream(stream));
+ request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(tx),
+ body: AsyncRefCell::new(Some(tx)),
cancel: CancelHandle::default(),
});