summaryrefslogtreecommitdiff
path: root/ext/fetch/26_fetch.js
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-01 08:56:10 -0700
committerGitHub <noreply@github.com>2023-12-01 08:56:10 -0700
commite6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch)
tree57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/fetch/26_fetch.js
parent687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff)
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/fetch/26_fetch.js')
-rw-r--r--ext/fetch/26_fetch.js179
1 files changed, 30 insertions, 149 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index e586d9a3a..8a71d9bcf 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -14,11 +14,12 @@ const core = globalThis.Deno.core;
const ops = core.ops;
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { byteLowerCase } from "ext:deno_web/00_infra.js";
-import { BlobPrototype } from "ext:deno_web/09_file.js";
import {
errorReadableStream,
+ getReadableStreamResourceBacking,
readableStreamForRid,
ReadableStreamPrototype,
+ resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
import { extractBody, InnerBody } from "ext:deno_fetch/22_body.js";
import { processUrlList, toInnerRequest } from "ext:deno_fetch/23_request.js";
@@ -37,22 +38,17 @@ const {
ArrayPrototypeSplice,
ArrayPrototypeFilter,
ArrayPrototypeIncludes,
+ Error,
ObjectPrototypeIsPrototypeOf,
Promise,
PromisePrototypeThen,
PromisePrototypeCatch,
SafeArrayIterator,
- SafeWeakMap,
String,
StringPrototypeStartsWith,
StringPrototypeToLowerCase,
TypeError,
- Uint8Array,
Uint8ArrayPrototype,
- WeakMapPrototypeDelete,
- WeakMapPrototypeGet,
- WeakMapPrototypeHas,
- WeakMapPrototypeSet,
} = primordials;
const REQUEST_BODY_HEADER_NAMES = [
@@ -62,28 +58,9 @@ const REQUEST_BODY_HEADER_NAMES = [
"content-type",
];
-const requestBodyReaders = new SafeWeakMap();
-
-/**
- * @param {{ method: string, url: string, headers: [string, string][], clientRid: number | null, hasBody: boolean }} args
- * @param {Uint8Array | null} body
- * @returns {{ requestRid: number, requestBodyRid: number | null, cancelHandleRid: number | null }}
- */
-function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) {
- return ops.op_fetch(
- method,
- url,
- headers,
- clientRid,
- hasBody,
- bodyLength,
- body,
- );
-}
-
/**
* @param {number} rid
- * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>}
+ * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number, error: string? }>}
*/
function opFetchSend(rid) {
return core.opAsync("op_fetch_send", rid);
@@ -145,154 +122,59 @@ async function mainFetch(req, recursive, terminator) {
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let reqBody = null;
-
- if (req.body !== null) {
- if (
- ObjectPrototypeIsPrototypeOf(
- ReadableStreamPrototype,
- req.body.streamOrStatic,
- )
- ) {
- if (
- req.body.length === null ||
- ObjectPrototypeIsPrototypeOf(BlobPrototype, req.body.source)
- ) {
- reqBody = req.body.stream;
+ let reqRid = null;
+
+ if (req.body) {
+ const stream = req.body.streamOrStatic;
+ const body = stream.body;
+
+ if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
+ reqBody = body;
+ } else if (typeof body === "string") {
+ reqBody = core.encode(body);
+ } else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
+ const resourceBacking = getReadableStreamResourceBacking(stream);
+ if (resourceBacking) {
+ reqRid = resourceBacking.rid;
} else {
- const reader = req.body.stream.getReader();
- WeakMapPrototypeSet(requestBodyReaders, req, reader);
- const r1 = await reader.read();
- if (r1.done) {
- reqBody = new Uint8Array(0);
- } else {
- reqBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
- }
- WeakMapPrototypeDelete(requestBodyReaders, req);
+ reqRid = resourceForReadableStream(stream, req.body.length);
}
} else {
- req.body.streamOrStatic.consumed = true;
- reqBody = req.body.streamOrStatic.body;
- // TODO(@AaronO): plumb support for StringOrBuffer all the way
- reqBody = typeof reqBody === "string" ? core.encode(reqBody) : reqBody;
+ throw TypeError("invalid body");
}
}
- const { requestRid, requestBodyRid, cancelHandleRid } = opFetch(
+ const { requestRid, cancelHandleRid } = ops.op_fetch(
req.method,
req.currentUrl(),
req.headerList,
req.clientRid,
- reqBody !== null,
- req.body?.length,
- ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, reqBody) ? reqBody : null,
+ reqBody !== null || reqRid !== null,
+ reqBody,
+ reqRid,
);
function onAbort() {
if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid);
}
- if (requestBodyRid !== null) {
- core.tryClose(requestBodyRid);
- }
}
terminator[abortSignal.add](onAbort);
-
- let requestSendError;
- let requestSendErrorSet = false;
-
- async function propagateError(err, message) {
- // TODO(lucacasonato): propagate error into response body stream
- try {
- await core.writeTypeError(requestBodyRid, message);
- } catch (err) {
- if (!requestSendErrorSet) {
- requestSendErrorSet = true;
- requestSendError = err;
- }
- }
- if (!requestSendErrorSet) {
- requestSendErrorSet = true;
- requestSendError = err;
- }
- }
-
- if (requestBodyRid !== null) {
- if (
- reqBody === null ||
- !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, reqBody)
- ) {
- throw new TypeError("Unreachable");
- }
- const reader = reqBody.getReader();
- WeakMapPrototypeSet(requestBodyReaders, req, reader);
- (async () => {
- let done = false;
- while (!done) {
- let val;
- try {
- const res = await reader.read();
- done = res.done;
- val = res.value;
- } catch (err) {
- if (terminator.aborted) break;
- await propagateError(err, "failed to read");
- break;
- }
- if (done) break;
- if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) {
- const error = new TypeError(
- "Item in request body ReadableStream is not a Uint8Array",
- );
- await reader.cancel(error);
- await propagateError(error, error.message);
- break;
- }
- try {
- await core.writeAll(requestBodyRid, val);
- } catch (err) {
- if (terminator.aborted) break;
- await reader.cancel(err);
- await propagateError(err, "failed to write");
- break;
- }
- }
- if (done && !terminator.aborted) {
- try {
- await core.shutdown(requestBodyRid);
- } catch (err) {
- if (!terminator.aborted) {
- await propagateError(err, "failed to flush");
- }
- }
- }
- WeakMapPrototypeDelete(requestBodyReaders, req);
- reader.releaseLock();
- core.tryClose(requestBodyRid);
- })();
- }
let resp;
try {
resp = await opFetchSend(requestRid);
} catch (err) {
if (terminator.aborted) return;
- if (requestSendErrorSet) {
- // if the request body stream errored, we want to propagate that error
- // instead of the original error from opFetchSend
- throw new TypeError("Failed to fetch: request body stream errored", {
- cause: requestSendError,
- });
- }
- if (requestBodyRid !== null) {
- core.tryClose(requestBodyRid);
- }
throw err;
} finally {
if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid);
}
}
+ // Re-throw any body errors
+ if (resp.error) {
+ throw new TypeError("body failed", { cause: new Error(resp.error) });
+ }
if (terminator.aborted) return abortedNetworkError();
processUrlList(req.urlList, req.urlListProcessed);
@@ -510,9 +392,8 @@ function fetch(input, init = {}) {
function abortFetch(request, responseObject, error) {
if (request.body !== null) {
- if (WeakMapPrototypeHas(requestBodyReaders, request)) {
- WeakMapPrototypeGet(requestBodyReaders, request).cancel(error);
- } else {
+ // Cancel the body if we haven't taken it as a resource yet
+ if (!request.body.streamOrStatic.locked) {
request.body.cancel(error);
}
}