diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/node/polyfills | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/node/polyfills')
-rw-r--r-- | ext/node/polyfills/_http_outgoing.ts | 3 | ||||
-rw-r--r-- | ext/node/polyfills/http.ts | 36 |
2 files changed, 20 insertions, 19 deletions
diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts index 50869ad82..8882ade55 100644 --- a/ext/node/polyfills/_http_outgoing.ts +++ b/ext/node/polyfills/_http_outgoing.ts @@ -4,7 +4,6 @@ // TODO(petamoriken): enable prefer-primordials for node polyfills // deno-lint-ignore-file prefer-primordials -const core = globalThis.__bootstrap.core; import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs"; import assert from "ext:deno_node/internal/assert.mjs"; import EE from "node:events"; @@ -544,7 +543,7 @@ export class OutgoingMessage extends Stream { data = new Uint8Array(data.buffer); } if (data.buffer.byteLength > 0) { - core.writeAll(this._bodyWriteRid, data).then(() => { + this._bodyWriter.write(data).then(() => { callback?.(); this.emit("drain"); }).catch((e) => { diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 475d691cc..a694c9e9b 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -58,6 +58,7 @@ import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js"; +import { resourceForReadableStream } from "ext:deno_web/06_streams.js"; import { TcpConn } from "ext:deno_net/01_net.js"; enum STATUS_CODES { @@ -586,15 +587,28 @@ class ClientRequest extends OutgoingMessage { const client = this._getClient() ?? createHttpClient({ http2: false }); this._client = client; + if ( + this.method === "POST" || this.method === "PATCH" || this.method === "PUT" + ) { + const { readable, writable } = new TransformStream({ + cancel: (e) => { + this._requestSendError = e; + }, + }); + + this._bodyWritable = writable; + this._bodyWriter = writable.getWriter(); + + this._bodyWriteRid = resourceForReadableStream(readable); + } + this._req = core.ops.op_node_http_request( this.method, url, headers, client.rid, - (this.method === "POST" || this.method === "PATCH" || - this.method === "PUT") && this._contentLength !== 0, + this._bodyWriteRid, ); - this._bodyWriteRid = this._req.requestBodyRid; } _implicitHeader() { @@ -638,23 +652,11 @@ class ClientRequest extends OutgoingMessage { this._implicitHeader(); this._send("", "latin1"); } + this._bodyWriter?.close(); (async () => { try { - const [res, _] = await Promise.all([ - core.opAsync("op_fetch_send", this._req.requestRid), - (async () => { - if (this._bodyWriteRid) { - try { - await core.shutdown(this._bodyWriteRid); - } catch (err) { - this._requestSendError = err; - } - - core.tryClose(this._bodyWriteRid); - } - })(), - ]); + const res = await core.opAsync("op_fetch_send", this._req.requestRid); try { cb?.(); } catch (_) { |