diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/node/polyfills/http.ts | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/node/polyfills/http.ts')
-rw-r--r-- | ext/node/polyfills/http.ts | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 475d691cc..a694c9e9b 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -58,6 +58,7 @@ import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js"; +import { resourceForReadableStream } from "ext:deno_web/06_streams.js"; import { TcpConn } from "ext:deno_net/01_net.js"; enum STATUS_CODES { @@ -586,15 +587,28 @@ class ClientRequest extends OutgoingMessage { const client = this._getClient() ?? createHttpClient({ http2: false }); this._client = client; + if ( + this.method === "POST" || this.method === "PATCH" || this.method === "PUT" + ) { + const { readable, writable } = new TransformStream({ + cancel: (e) => { + this._requestSendError = e; + }, + }); + + this._bodyWritable = writable; + this._bodyWriter = writable.getWriter(); + + this._bodyWriteRid = resourceForReadableStream(readable); + } + this._req = core.ops.op_node_http_request( this.method, url, headers, client.rid, - (this.method === "POST" || this.method === "PATCH" || - this.method === "PUT") && this._contentLength !== 0, + this._bodyWriteRid, ); - this._bodyWriteRid = this._req.requestBodyRid; } _implicitHeader() { @@ -638,23 +652,11 @@ class ClientRequest extends OutgoingMessage { this._implicitHeader(); this._send("", "latin1"); } + this._bodyWriter?.close(); (async () => { try { - const [res, _] = await Promise.all([ - core.opAsync("op_fetch_send", this._req.requestRid), - (async () => { - if (this._bodyWriteRid) { - try { - await core.shutdown(this._bodyWriteRid); - } catch (err) { - this._requestSendError = err; - } - - core.tryClose(this._bodyWriteRid); - } - })(), - ]); + const res = await core.opAsync("op_fetch_send", this._req.requestRid); try { cb?.(); } catch (_) { |