summaryrefslogtreecommitdiff
path: root/extensions/fetch/26_fetch.js
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/fetch/26_fetch.js')
-rw-r--r--extensions/fetch/26_fetch.js217
1 files changed, 171 insertions, 46 deletions
diff --git a/extensions/fetch/26_fetch.js b/extensions/fetch/26_fetch.js
index 1dd797339..6e1d4d4c8 100644
--- a/extensions/fetch/26_fetch.js
+++ b/extensions/fetch/26_fetch.js
@@ -15,14 +15,18 @@
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { byteLowerCase } = window.__bootstrap.infra;
+ const { errorReadableStream } = window.__bootstrap.streams;
const { InnerBody, extractBody } = window.__bootstrap.fetchBody;
const {
toInnerRequest,
+ toInnerResponse,
fromInnerResponse,
redirectStatus,
nullBodyStatus,
networkError,
+ abortedNetworkError,
} = window.__bootstrap.fetch;
+ const abortSignal = window.__bootstrap.abortSignal;
const REQUEST_BODY_HEADER_NAMES = [
"content-encoding",
@@ -68,10 +72,26 @@
/**
* @param {number} responseBodyRid
+ * @param {AbortSignal} [terminator]
* @returns {ReadableStream<Uint8Array>}
*/
- function createResponseBodyStream(responseBodyRid) {
- return new ReadableStream({
+ function createResponseBodyStream(responseBodyRid, terminator) {
+ function onAbort() {
+ if (readable) {
+ errorReadableStream(
+ readable,
+ new DOMException("Ongoing fetch was aborted.", "AbortError"),
+ );
+ }
+ try {
+ core.close(responseBodyRid);
+ } catch (_) {
+ // might have already been closed
+ }
+ }
+ // TODO(lucacasonato): clean up registration
+ terminator[abortSignal.add](onAbort);
+ const readable = new ReadableStream({
type: "bytes",
async pull(controller) {
try {
@@ -88,28 +108,45 @@
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
- core.close(responseBodyRid);
+ try {
+ core.close(responseBodyRid);
+ } catch (_) {
+ // might have already been closed
+ }
}
} catch (err) {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- controller.close();
- core.close(responseBodyRid);
+ if (terminator.aborted) {
+ controller.error(
+ new DOMException("Ongoing fetch was aborted.", "AbortError"),
+ );
+ } else {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ }
+ try {
+ core.close(responseBodyRid);
+ } catch (_) {
+ // might have already been closed
+ }
}
},
cancel() {
- core.close(responseBodyRid);
+ if (!terminator.aborted) {
+ terminator[abortSignal.signalAbort]();
+ }
},
});
+ return readable;
}
/**
* @param {InnerRequest} req
* @param {boolean} recursive
+ * @param {AbortSignal} terminator
* @returns {Promise<InnerResponse>}
*/
- async function mainFetch(req, recursive) {
+ async function mainFetch(req, recursive, terminator) {
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let reqBody = null;
if (req.body !== null) {
@@ -130,7 +167,7 @@
}
}
- const { requestRid, requestBodyRid } = opFetch({
+ const { requestRid, requestBodyRid, cancelHandleRid } = opFetch({
method: req.method,
url: req.currentUrl(),
headers: req.headerList,
@@ -138,6 +175,20 @@
hasBody: reqBody !== null,
}, reqBody instanceof Uint8Array ? reqBody : null);
+ function onAbort() {
+ try {
+ core.close(cancelHandleRid);
+ } catch (_) {
+ // might have already been closed
+ }
+ try {
+ core.close(requestBodyRid);
+ } catch (_) {
+ // might have already been closed
+ }
+ }
+ terminator[abortSignal.add](onAbort);
+
if (requestBodyRid !== null) {
if (reqBody === null || !(reqBody instanceof ReadableStream)) {
throw new TypeError("Unreachable");
@@ -145,24 +196,49 @@
const reader = reqBody.getReader();
(async () => {
while (true) {
- const { value, done } = await reader.read();
+ const { value, done } = await reader.read().catch((err) => {
+ if (terminator.aborted) return { done: true, value: undefined };
+ throw err;
+ });
if (done) break;
if (!(value instanceof Uint8Array)) {
await reader.cancel("value not a Uint8Array");
break;
}
try {
- await opFetchRequestWrite(requestBodyRid, value);
+ await opFetchRequestWrite(requestBodyRid, value).catch((err) => {
+ if (terminator.aborted) return;
+ throw err;
+ });
+ if (terminator.aborted) break;
} catch (err) {
await reader.cancel(err);
break;
}
}
- core.close(requestBodyRid);
+ try {
+ core.close(requestBodyRid);
+ } catch (_) {
+ // might have already been closed
+ }
})();
}
- const resp = await opFetchSend(requestRid);
+ let resp;
+ try {
+ resp = await opFetchSend(requestRid).catch((err) => {
+ if (terminator.aborted) return;
+ throw err;
+ });
+ } finally {
+ try {
+ core.close(cancelHandleRid);
+ } catch (_) {
+ // might have already been closed
+ }
+ }
+ if (terminator.aborted) return abortedNetworkError();
+
/** @type {InnerResponse} */
const response = {
headerList: resp.headers,
@@ -185,7 +261,7 @@
);
case "follow":
core.close(resp.responseRid);
- return httpRedirectFetch(req, response);
+ return httpRedirectFetch(req, response, terminator);
case "manual":
break;
}
@@ -194,7 +270,9 @@
if (nullBodyStatus(response.status)) {
core.close(resp.responseRid);
} else {
- response.body = new InnerBody(createResponseBodyStream(resp.responseRid));
+ response.body = new InnerBody(
+ createResponseBodyStream(resp.responseRid, terminator),
+ );
}
if (recursive) return response;
@@ -211,7 +289,7 @@
* @param {InnerResponse} response
* @returns {Promise<InnerResponse>}
*/
- function httpRedirectFetch(request, response) {
+ function httpRedirectFetch(request, response, terminator) {
const locationHeaders = response.headerList.filter((entry) =>
byteLowerCase(entry[0]) === "location"
);
@@ -264,43 +342,90 @@
request.body = res.body;
}
request.urlList.push(locationURL.href);
- return mainFetch(request, true);
+ return mainFetch(request, true, terminator);
}
/**
* @param {RequestInfo} input
* @param {RequestInit} init
*/
- async function fetch(input, init = {}) {
- const prefix = "Failed to call 'fetch'";
- webidl.requiredArguments(arguments.length, 1, { prefix });
- input = webidl.converters["RequestInfo"](input, {
- prefix,
- context: "Argument 1",
- });
- init = webidl.converters["RequestInit"](init, {
- prefix,
- context: "Argument 2",
- });
-
+ function fetch(input, init = {}) {
// 1.
- const requestObject = new Request(input, init);
- // 2.
- const request = toInnerRequest(requestObject);
- // 10.
- if (!requestObject.headers.has("Accept")) {
- request.headerList.push(["Accept", "*/*"]);
- }
+ const p = new Promise((resolve, reject) => {
+ const prefix = "Failed to call 'fetch'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ input = webidl.converters["RequestInfo"](input, {
+ prefix,
+ context: "Argument 1",
+ });
+ init = webidl.converters["RequestInit"](init, {
+ prefix,
+ context: "Argument 2",
+ });
- // 12.
- const response = await mainFetch(request, false);
- if (response.type === "error") {
- throw new TypeError(
- "Fetch failed: " + (response.error ?? "unknown error"),
- );
- }
+ // 2.
+ const requestObject = new Request(input, init);
+ // 3.
+ const request = toInnerRequest(requestObject);
+ // 4.
+ if (requestObject.signal.aborted) {
+ reject(abortFetch(request, null));
+ return;
+ }
- return fromInnerResponse(response, "immutable");
+ // 7.
+ let responseObject = null;
+ // 9.
+ let locallyAborted = false;
+ // 10.
+ function onabort() {
+ locallyAborted = true;
+ reject(abortFetch(request, responseObject));
+ }
+ requestObject.signal[abortSignal.add](onabort);
+
+ if (!requestObject.headers.has("Accept")) {
+ request.headerList.push(["Accept", "*/*"]);
+ }
+
+ // 12.
+ mainFetch(request, false, requestObject.signal).then((response) => {
+ // 12.1.
+ if (locallyAborted) return;
+ // 12.2.
+ if (response.aborted) {
+ reject(request, responseObject);
+ requestObject.signal[abortSignal.remove](onabort);
+ return;
+ }
+ // 12.3.
+ if (response.type === "error") {
+ const err = new TypeError(
+ "Fetch failed: " + (response.error ?? "unknown error"),
+ );
+ reject(err);
+ requestObject.signal[abortSignal.remove](onabort);
+ return;
+ }
+ responseObject = fromInnerResponse(response, "immutable");
+ resolve(responseObject);
+ requestObject.signal[abortSignal.remove](onabort);
+ }).catch((err) => {
+ reject(err);
+ requestObject.signal[abortSignal.remove](onabort);
+ });
+ });
+ return p;
+ }
+
+ function abortFetch(request, responseObject) {
+ const error = new DOMException("Ongoing fetch was aborted.", "AbortError");
+ if (request.body !== null) request.body.cancel(error);
+ if (responseObject !== null) {
+ const response = toInnerResponse(responseObject);
+ if (response.body !== null) response.body.error(error);
+ }
+ return error;
}
window.__bootstrap.fetch ??= {};