diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/node | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/node')
-rw-r--r-- | ext/node/ops/http.rs | 25 | ||||
-rw-r--r-- | ext/node/polyfills/_http_outgoing.ts | 3 | ||||
-rw-r--r-- | ext/node/polyfills/http.ts | 36 |
3 files changed, 27 insertions, 37 deletions
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 40ef6df32..fd593244c 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -4,18 +4,17 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op2; use deno_core::url::Url; -use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::ResourceId; use deno_fetch::get_or_create_client_from_state; -use deno_fetch::FetchBodyStream; use deno_fetch::FetchCancelHandle; -use deno_fetch::FetchRequestBodyResource; use deno_fetch::FetchRequestResource; use deno_fetch::FetchReturn; use deno_fetch::HttpClientResource; +use deno_fetch::ResourceToBodyAdapter; use reqwest::header::HeaderMap; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; @@ -31,7 +30,7 @@ pub fn op_node_http_request<P>( #[string] url: String, #[serde] headers: Vec<(ByteString, ByteString)>, #[smi] client_rid: Option<u32>, - has_body: bool, + #[smi] body: Option<ResourceId>, ) -> Result<FetchReturn, AnyError> where P: crate::NodePermissions + 'static, @@ -63,25 +62,16 @@ where let mut request = client.request(method.clone(), url).headers(header_map); - let request_body_rid = if has_body { - // If no body is passed, we return a writer for streaming the body. - let (tx, stream) = tokio::sync::mpsc::channel(1); - - request = request.body(Body::wrap_stream(FetchBodyStream(stream))); - - let request_body_rid = state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(Some(tx)), - cancel: CancelHandle::default(), - }); - - Some(request_body_rid) + if let Some(body) = body { + request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new( + state.resource_table.take_any(body)?, + ))); } else { // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch if matches!(method, Method::POST | Method::PUT) { request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); } - None }; let cancel_handle = CancelHandle::new_rc(); @@ -104,7 +94,6 @@ where Ok(FetchReturn { request_rid, - request_body_rid, cancel_handle_rid: Some(cancel_handle_rid), }) } diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts index 50869ad82..8882ade55 100644 --- a/ext/node/polyfills/_http_outgoing.ts +++ b/ext/node/polyfills/_http_outgoing.ts @@ -4,7 +4,6 @@ // TODO(petamoriken): enable prefer-primordials for node polyfills // deno-lint-ignore-file prefer-primordials -const core = globalThis.__bootstrap.core; import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs"; import assert from "ext:deno_node/internal/assert.mjs"; import EE from "node:events"; @@ -544,7 +543,7 @@ export class OutgoingMessage extends Stream { data = new Uint8Array(data.buffer); } if (data.buffer.byteLength > 0) { - core.writeAll(this._bodyWriteRid, data).then(() => { + this._bodyWriter.write(data).then(() => { callback?.(); this.emit("drain"); }).catch((e) => { diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 475d691cc..a694c9e9b 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -58,6 +58,7 @@ import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js"; +import { resourceForReadableStream } from "ext:deno_web/06_streams.js"; import { TcpConn } from "ext:deno_net/01_net.js"; enum STATUS_CODES { @@ -586,15 +587,28 @@ class ClientRequest extends OutgoingMessage { const client = this._getClient() ?? createHttpClient({ http2: false }); this._client = client; + if ( + this.method === "POST" || this.method === "PATCH" || this.method === "PUT" + ) { + const { readable, writable } = new TransformStream({ + cancel: (e) => { + this._requestSendError = e; + }, + }); + + this._bodyWritable = writable; + this._bodyWriter = writable.getWriter(); + + this._bodyWriteRid = resourceForReadableStream(readable); + } + this._req = core.ops.op_node_http_request( this.method, url, headers, client.rid, - (this.method === "POST" || this.method === "PATCH" || - this.method === "PUT") && this._contentLength !== 0, + this._bodyWriteRid, ); - this._bodyWriteRid = this._req.requestBodyRid; } _implicitHeader() { @@ -638,23 +652,11 @@ class ClientRequest extends OutgoingMessage { this._implicitHeader(); this._send("", "latin1"); } + this._bodyWriter?.close(); (async () => { try { - const [res, _] = await Promise.all([ - core.opAsync("op_fetch_send", this._req.requestRid), - (async () => { - if (this._bodyWriteRid) { - try { - await core.shutdown(this._bodyWriteRid); - } catch (err) { - this._requestSendError = err; - } - - core.tryClose(this._bodyWriteRid); - } - })(), - ]); + const res = await core.opAsync("op_fetch_send", this._req.requestRid); try { cb?.(); } catch (_) { |