summaryrefslogtreecommitdiff
path: root/ext/node
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-01 08:56:10 -0700
committerGitHub <noreply@github.com>2023-12-01 08:56:10 -0700
commite6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch)
tree57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/node
parent687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff)
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/node')
-rw-r--r--ext/node/ops/http.rs25
-rw-r--r--ext/node/polyfills/_http_outgoing.ts3
-rw-r--r--ext/node/polyfills/http.ts36
3 files changed, 27 insertions, 37 deletions
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index 40ef6df32..fd593244c 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -4,18 +4,17 @@ use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::url::Url;
-use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::ResourceId;
use deno_fetch::get_or_create_client_from_state;
-use deno_fetch::FetchBodyStream;
use deno_fetch::FetchCancelHandle;
-use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
+use deno_fetch::ResourceToBodyAdapter;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@@ -31,7 +30,7 @@ pub fn op_node_http_request<P>(
#[string] url: String,
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
- has_body: bool,
+ #[smi] body: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
where
P: crate::NodePermissions + 'static,
@@ -63,25 +62,16 @@ where
let mut request = client.request(method.clone(), url).headers(header_map);
- let request_body_rid = if has_body {
- // If no body is passed, we return a writer for streaming the body.
- let (tx, stream) = tokio::sync::mpsc::channel(1);
-
- request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
-
- let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(Some(tx)),
- cancel: CancelHandle::default(),
- });
-
- Some(request_body_rid)
+ if let Some(body) = body {
+ request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new(
+ state.resource_table.take_any(body)?,
+ )));
} else {
// POST and PUT requests should always have a 0 length content-length,
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
- None
};
let cancel_handle = CancelHandle::new_rc();
@@ -104,7 +94,6 @@ where
Ok(FetchReturn {
request_rid,
- request_body_rid,
cancel_handle_rid: Some(cancel_handle_rid),
})
}
diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts
index 50869ad82..8882ade55 100644
--- a/ext/node/polyfills/_http_outgoing.ts
+++ b/ext/node/polyfills/_http_outgoing.ts
@@ -4,7 +4,6 @@
// TODO(petamoriken): enable prefer-primordials for node polyfills
// deno-lint-ignore-file prefer-primordials
-const core = globalThis.__bootstrap.core;
import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs";
import assert from "ext:deno_node/internal/assert.mjs";
import EE from "node:events";
@@ -544,7 +543,7 @@ export class OutgoingMessage extends Stream {
data = new Uint8Array(data.buffer);
}
if (data.buffer.byteLength > 0) {
- core.writeAll(this._bodyWriteRid, data).then(() => {
+ this._bodyWriter.write(data).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {
diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts
index 475d691cc..a694c9e9b 100644
--- a/ext/node/polyfills/http.ts
+++ b/ext/node/polyfills/http.ts
@@ -58,6 +58,7 @@ import { createHttpClient } from "ext:deno_fetch/22_http_client.js";
import { headersEntries } from "ext:deno_fetch/20_headers.js";
import { timerId } from "ext:deno_web/03_abort_signal.js";
import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js";
+import { resourceForReadableStream } from "ext:deno_web/06_streams.js";
import { TcpConn } from "ext:deno_net/01_net.js";
enum STATUS_CODES {
@@ -586,15 +587,28 @@ class ClientRequest extends OutgoingMessage {
const client = this._getClient() ?? createHttpClient({ http2: false });
this._client = client;
+ if (
+ this.method === "POST" || this.method === "PATCH" || this.method === "PUT"
+ ) {
+ const { readable, writable } = new TransformStream({
+ cancel: (e) => {
+ this._requestSendError = e;
+ },
+ });
+
+ this._bodyWritable = writable;
+ this._bodyWriter = writable.getWriter();
+
+ this._bodyWriteRid = resourceForReadableStream(readable);
+ }
+
this._req = core.ops.op_node_http_request(
this.method,
url,
headers,
client.rid,
- (this.method === "POST" || this.method === "PATCH" ||
- this.method === "PUT") && this._contentLength !== 0,
+ this._bodyWriteRid,
);
- this._bodyWriteRid = this._req.requestBodyRid;
}
_implicitHeader() {
@@ -638,23 +652,11 @@ class ClientRequest extends OutgoingMessage {
this._implicitHeader();
this._send("", "latin1");
}
+ this._bodyWriter?.close();
(async () => {
try {
- const [res, _] = await Promise.all([
- core.opAsync("op_fetch_send", this._req.requestRid),
- (async () => {
- if (this._bodyWriteRid) {
- try {
- await core.shutdown(this._bodyWriteRid);
- } catch (err) {
- this._requestSendError = err;
- }
-
- core.tryClose(this._bodyWriteRid);
- }
- })(),
- ]);
+ const res = await core.opAsync("op_fetch_send", this._req.requestRid);
try {
cb?.();
} catch (_) {