diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/fetch/26_fetch.js | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/fetch/26_fetch.js')
-rw-r--r-- | ext/fetch/26_fetch.js | 179 |
1 files changed, 30 insertions, 149 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index e586d9a3a..8a71d9bcf 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -14,11 +14,12 @@ const core = globalThis.Deno.core; const ops = core.ops; import * as webidl from "ext:deno_webidl/00_webidl.js"; import { byteLowerCase } from "ext:deno_web/00_infra.js"; -import { BlobPrototype } from "ext:deno_web/09_file.js"; import { errorReadableStream, + getReadableStreamResourceBacking, readableStreamForRid, ReadableStreamPrototype, + resourceForReadableStream, } from "ext:deno_web/06_streams.js"; import { extractBody, InnerBody } from "ext:deno_fetch/22_body.js"; import { processUrlList, toInnerRequest } from "ext:deno_fetch/23_request.js"; @@ -37,22 +38,17 @@ const { ArrayPrototypeSplice, ArrayPrototypeFilter, ArrayPrototypeIncludes, + Error, ObjectPrototypeIsPrototypeOf, Promise, PromisePrototypeThen, PromisePrototypeCatch, SafeArrayIterator, - SafeWeakMap, String, StringPrototypeStartsWith, StringPrototypeToLowerCase, TypeError, - Uint8Array, Uint8ArrayPrototype, - WeakMapPrototypeDelete, - WeakMapPrototypeGet, - WeakMapPrototypeHas, - WeakMapPrototypeSet, } = primordials; const REQUEST_BODY_HEADER_NAMES = [ @@ -62,28 +58,9 @@ const REQUEST_BODY_HEADER_NAMES = [ "content-type", ]; -const requestBodyReaders = new SafeWeakMap(); - -/** - * @param {{ method: string, url: string, headers: [string, string][], clientRid: number | null, hasBody: boolean }} args - * @param {Uint8Array | null} body - * @returns {{ requestRid: number, requestBodyRid: number | null, cancelHandleRid: number | null }} - */ -function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) { - return ops.op_fetch( - method, - url, - headers, - clientRid, - hasBody, - bodyLength, - body, - ); -} - /** * @param {number} rid - * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>} + * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number, error: string? }>} */ function opFetchSend(rid) { return core.opAsync("op_fetch_send", rid); @@ -145,154 +122,59 @@ async function mainFetch(req, recursive, terminator) { /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ let reqBody = null; - - if (req.body !== null) { - if ( - ObjectPrototypeIsPrototypeOf( - ReadableStreamPrototype, - req.body.streamOrStatic, - ) - ) { - if ( - req.body.length === null || - ObjectPrototypeIsPrototypeOf(BlobPrototype, req.body.source) - ) { - reqBody = req.body.stream; + let reqRid = null; + + if (req.body) { + const stream = req.body.streamOrStatic; + const body = stream.body; + + if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { + reqBody = body; + } else if (typeof body === "string") { + reqBody = core.encode(body); + } else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { + const resourceBacking = getReadableStreamResourceBacking(stream); + if (resourceBacking) { + reqRid = resourceBacking.rid; } else { - const reader = req.body.stream.getReader(); - WeakMapPrototypeSet(requestBodyReaders, req, reader); - const r1 = await reader.read(); - if (r1.done) { - reqBody = new Uint8Array(0); - } else { - reqBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - WeakMapPrototypeDelete(requestBodyReaders, req); + reqRid = resourceForReadableStream(stream, req.body.length); } } else { - req.body.streamOrStatic.consumed = true; - reqBody = req.body.streamOrStatic.body; - // TODO(@AaronO): plumb support for StringOrBuffer all the way - reqBody = typeof reqBody === "string" ? core.encode(reqBody) : reqBody; + throw TypeError("invalid body"); } } - const { requestRid, requestBodyRid, cancelHandleRid } = opFetch( + const { requestRid, cancelHandleRid } = ops.op_fetch( req.method, req.currentUrl(), req.headerList, req.clientRid, - reqBody !== null, - req.body?.length, - ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, reqBody) ? reqBody : null, + reqBody !== null || reqRid !== null, + reqBody, + reqRid, ); function onAbort() { if (cancelHandleRid !== null) { core.tryClose(cancelHandleRid); } - if (requestBodyRid !== null) { - core.tryClose(requestBodyRid); - } } terminator[abortSignal.add](onAbort); - - let requestSendError; - let requestSendErrorSet = false; - - async function propagateError(err, message) { - // TODO(lucacasonato): propagate error into response body stream - try { - await core.writeTypeError(requestBodyRid, message); - } catch (err) { - if (!requestSendErrorSet) { - requestSendErrorSet = true; - requestSendError = err; - } - } - if (!requestSendErrorSet) { - requestSendErrorSet = true; - requestSendError = err; - } - } - - if (requestBodyRid !== null) { - if ( - reqBody === null || - !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, reqBody) - ) { - throw new TypeError("Unreachable"); - } - const reader = reqBody.getReader(); - WeakMapPrototypeSet(requestBodyReaders, req, reader); - (async () => { - let done = false; - while (!done) { - let val; - try { - const res = await reader.read(); - done = res.done; - val = res.value; - } catch (err) { - if (terminator.aborted) break; - await propagateError(err, "failed to read"); - break; - } - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) { - const error = new TypeError( - "Item in request body ReadableStream is not a Uint8Array", - ); - await reader.cancel(error); - await propagateError(error, error.message); - break; - } - try { - await core.writeAll(requestBodyRid, val); - } catch (err) { - if (terminator.aborted) break; - await reader.cancel(err); - await propagateError(err, "failed to write"); - break; - } - } - if (done && !terminator.aborted) { - try { - await core.shutdown(requestBodyRid); - } catch (err) { - if (!terminator.aborted) { - await propagateError(err, "failed to flush"); - } - } - } - WeakMapPrototypeDelete(requestBodyReaders, req); - reader.releaseLock(); - core.tryClose(requestBodyRid); - })(); - } let resp; try { resp = await opFetchSend(requestRid); } catch (err) { if (terminator.aborted) return; - if (requestSendErrorSet) { - // if the request body stream errored, we want to propagate that error - // instead of the original error from opFetchSend - throw new TypeError("Failed to fetch: request body stream errored", { - cause: requestSendError, - }); - } - if (requestBodyRid !== null) { - core.tryClose(requestBodyRid); - } throw err; } finally { if (cancelHandleRid !== null) { core.tryClose(cancelHandleRid); } } + // Re-throw any body errors + if (resp.error) { + throw new TypeError("body failed", { cause: new Error(resp.error) }); + } if (terminator.aborted) return abortedNetworkError(); processUrlList(req.urlList, req.urlListProcessed); @@ -510,9 +392,8 @@ function fetch(input, init = {}) { function abortFetch(request, responseObject, error) { if (request.body !== null) { - if (WeakMapPrototypeHas(requestBodyReaders, request)) { - WeakMapPrototypeGet(requestBodyReaders, request).cancel(error); - } else { + // Cancel the body if we haven't taken it as a resource yet + if (!request.body.streamOrStatic.locked) { request.body.cancel(error); } } |